HBASE-2792 Create a better way to chain log cleaners
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@980901 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d0890ff453
commit
b3db1e976e
|
@ -817,6 +817,8 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2879 Offer ZK CLI outside of HBase Shell
|
||||
(Nicolas Spiegelberg via Stack)
|
||||
HBASE-2886 Add search box to site (Alex Baranau via Stack)
|
||||
HBASE-2792 Create a better way to chain log cleaners
|
||||
(Chongxin Li via Stack)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -20,16 +20,22 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Interface for the log cleaning function inside the master. Only 1 is called
|
||||
* so if the desired effect is the mix of many cleaners, do call them yourself
|
||||
* in order to control the flow.
|
||||
* HBase ships with OldLogsCleaner as the default implementation.
|
||||
* Interface for the log cleaning function inside the master. By default, three
|
||||
* cleaners <code>TimeToLiveLogCleaner</code>, <code>ReplicationLogCleaner</code>,
|
||||
* <code>SnapshotLogCleaner</code> are called in order. So if other effects are
|
||||
* needed, implement your own LogCleanerDelegate and add it to the configuration
|
||||
* "hbase.master.logcleaner.plugins", which is a comma-separated list of fully
|
||||
* qualified class names. LogsCleaner will add it to the chain.
|
||||
*
|
||||
* HBase ships with LogsCleaner as the default implementation.
|
||||
*
|
||||
* This interface extends Configurable, so setConf needs to be called once
|
||||
* before using the cleaner.
|
||||
* Since LogCleanerDelegates are created in LogsCleaner by reflection. Classes
|
||||
* that implements this interface should provide a default constructor.
|
||||
*/
|
||||
public interface LogCleanerDelegate extends Configurable {
|
||||
|
||||
|
|
|
@ -31,24 +31,25 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* This Chore, everytime it runs, will clear the logs in the old logs folder
|
||||
* that are older than hbase.master.logcleaner.ttl and, in order to limit the
|
||||
* that are deletable for each log cleaner in the chain, in order to limit the
|
||||
* number of deletes it sends, will only delete maximum 20 in a single run.
|
||||
*/
|
||||
public class OldLogsCleaner extends Chore {
|
||||
public class LogsCleaner extends Chore {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(OldLogsCleaner.class.getName());
|
||||
static final Log LOG = LogFactory.getLog(LogsCleaner.class.getName());
|
||||
|
||||
// Max number we can delete on every chore, this is to make sure we don't
|
||||
// issue thousands of delete commands around the same time
|
||||
private final int maxDeletedLogs;
|
||||
private final FileSystem fs;
|
||||
private final Path oldLogDir;
|
||||
private final LogCleanerDelegate logCleaner;
|
||||
private List<LogCleanerDelegate> logCleanersChain;
|
||||
private final Configuration conf;
|
||||
|
||||
/**
|
||||
|
@ -59,36 +60,65 @@ public class OldLogsCleaner extends Chore {
|
|||
* @param fs handle to the FS
|
||||
* @param oldLogDir the path to the archived logs
|
||||
*/
|
||||
public OldLogsCleaner(final int p, final AtomicBoolean s,
|
||||
public LogsCleaner(final int p, final AtomicBoolean s,
|
||||
Configuration conf, FileSystem fs,
|
||||
Path oldLogDir) {
|
||||
super("OldLogsCleaner", p, s);
|
||||
// Use the log cleaner provided by replication if enabled, unless something
|
||||
// was already provided
|
||||
if (conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false) &&
|
||||
conf.get("hbase.master.logcleanerplugin.impl") == null) {
|
||||
conf.set("hbase.master.logcleanerplugin.impl",
|
||||
"org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner");
|
||||
}
|
||||
super("LogsCleaner", p, s);
|
||||
|
||||
this.maxDeletedLogs =
|
||||
conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
|
||||
this.fs = fs;
|
||||
this.oldLogDir = oldLogDir;
|
||||
this.conf = conf;
|
||||
this.logCleaner = getLogCleaner();
|
||||
this.logCleanersChain = new LinkedList<LogCleanerDelegate>();
|
||||
|
||||
initLogCleanersChain();
|
||||
}
|
||||
|
||||
private LogCleanerDelegate getLogCleaner() {
|
||||
/*
|
||||
* Initialize the chain of log cleaners from the configuration. The default
|
||||
* three LogCleanerDelegates in this chain are: TimeToLiveLogCleaner,
|
||||
* ReplicationLogCleaner and SnapshotLogCleaner.
|
||||
*/
|
||||
private void initLogCleanersChain() {
|
||||
String[] logCleaners = conf.getStrings("hbase.master.logcleaner.plugins");
|
||||
if (logCleaners != null) {
|
||||
for (String className : logCleaners) {
|
||||
LogCleanerDelegate logCleaner = newLogCleaner(className, conf);
|
||||
addLogCleaner(logCleaner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A utility method to create new instances of LogCleanerDelegate based
|
||||
* on the class name of the LogCleanerDelegate.
|
||||
* @param className fully qualified class name of the LogCleanerDelegate
|
||||
* @param conf
|
||||
* @return the new instance
|
||||
*/
|
||||
public static LogCleanerDelegate newLogCleaner(String className, Configuration conf) {
|
||||
try {
|
||||
Class c = Class.forName(conf.get("hbase.master.logcleanerplugin.impl",
|
||||
TimeToLiveLogCleaner.class.getCanonicalName()));
|
||||
Class c = Class.forName(className);
|
||||
LogCleanerDelegate cleaner = (LogCleanerDelegate) c.newInstance();
|
||||
cleaner.setConf(conf);
|
||||
return cleaner;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Passed log cleaner implementation throws errors, " +
|
||||
"defaulting to TimeToLiveLogCleaner", e);
|
||||
return new TimeToLiveLogCleaner();
|
||||
} catch(Exception e) {
|
||||
LOG.warn("Can NOT create LogCleanerDelegate: " + className, e);
|
||||
// skipping if can't instantiate
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a LogCleanerDelegate to the log cleaner chain. A log file is deletable
|
||||
* if it is deletable for each LogCleanerDelegate in the chain.
|
||||
* @param logCleaner
|
||||
*/
|
||||
public void addLogCleaner(LogCleanerDelegate logCleaner) {
|
||||
if (logCleaner != null && !logCleanersChain.contains(logCleaner)) {
|
||||
logCleanersChain.add(logCleaner);
|
||||
LOG.debug("Add log cleaner in chain: " + logCleaner.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,13 +127,18 @@ public class OldLogsCleaner extends Chore {
|
|||
try {
|
||||
FileStatus[] files = this.fs.listStatus(this.oldLogDir);
|
||||
int nbDeletedLog = 0;
|
||||
for (FileStatus file : files) {
|
||||
FILE: for (FileStatus file : files) {
|
||||
Path filePath = file.getPath();
|
||||
if (HLog.validateHLogFilename(filePath.getName())) {
|
||||
if (logCleaner.isLogDeletable(filePath) ) {
|
||||
this.fs.delete(filePath, true);
|
||||
nbDeletedLog++;
|
||||
for (LogCleanerDelegate logCleaner : logCleanersChain) {
|
||||
if (!logCleaner.isLogDeletable(filePath) ) {
|
||||
// this log is not deletable, continue to process next log file
|
||||
continue FILE;
|
||||
}
|
||||
}
|
||||
// delete this log file if it passes all the log cleaners
|
||||
this.fs.delete(filePath, true);
|
||||
nbDeletedLog++;
|
||||
} else {
|
||||
LOG.warn("Found a wrongly formated file: "
|
||||
+ file.getPath().getName());
|
|
@ -96,7 +96,7 @@ public class ServerManager {
|
|||
|
||||
private int minimumServerCount;
|
||||
|
||||
private final OldLogsCleaner oldLogCleaner;
|
||||
private final LogsCleaner logCleaner;
|
||||
|
||||
/*
|
||||
* Dumps into log current stats on dead servers and number of servers
|
||||
|
@ -150,11 +150,11 @@ public class ServerManager {
|
|||
String n = Thread.currentThread().getName();
|
||||
Threads.setDaemonThreadRunning(this.serverMonitorThread,
|
||||
n + ".serverMonitor");
|
||||
this.oldLogCleaner = new OldLogsCleaner(
|
||||
this.logCleaner = new LogsCleaner(
|
||||
c.getInt("hbase.master.meta.thread.rescanfrequency",60 * 1000),
|
||||
this.master.getShutdownRequested(), c,
|
||||
master.getFileSystem(), master.getOldLogDir());
|
||||
Threads.setDaemonThreadRunning(oldLogCleaner,
|
||||
Threads.setDaemonThreadRunning(logCleaner,
|
||||
n + ".oldLogCleaner");
|
||||
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.master.TimeToLiveLogCleaner;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
|
@ -44,8 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(ReplicationLogCleaner.class);
|
||||
private TimeToLiveLogCleaner ttlCleaner;
|
||||
LogFactory.getLog(ReplicationLogCleaner.class);
|
||||
private Configuration conf;
|
||||
private ReplicationZookeeperWrapper zkHelper;
|
||||
private Set<String> hlogs = new HashSet<String>();
|
||||
|
@ -57,12 +55,6 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
|
|||
|
||||
@Override
|
||||
public boolean isLogDeletable(Path filePath) {
|
||||
|
||||
// Don't bother going further if the hlog isn't even expired
|
||||
if (!ttlCleaner.isLogDeletable(filePath)) {
|
||||
LOG.debug("Won't delete log since not past due " + filePath);
|
||||
return false;
|
||||
}
|
||||
String log = filePath.getName();
|
||||
// If we saw the hlog previously, let's consider it's still used
|
||||
// At some point in the future we will refresh the list and it will be gone
|
||||
|
@ -72,7 +64,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
|
|||
|
||||
// Let's see it's still there
|
||||
// This solution makes every miss very expensive to process since we
|
||||
// almost completly refresh the cache each time
|
||||
// almost completely refresh the cache each time
|
||||
return !refreshHLogsAndSearch(log);
|
||||
}
|
||||
|
||||
|
@ -117,8 +109,6 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.ttlCleaner = new TimeToLiveLogCleaner();
|
||||
this.ttlCleaner.setConf(conf);
|
||||
try {
|
||||
this.zkHelper = new ReplicationZookeeperWrapper(
|
||||
ZooKeeperWrapper.createInstance(this.conf,
|
||||
|
|
|
@ -289,6 +289,17 @@
|
|||
after which it will be cleaned by a master thread.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.logcleaner.plugins</name>
|
||||
<value>org.apache.hadoop.hbase.master.TimeToLiveLogCleaner,org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner</value>
|
||||
<description>A comma-separated list of LogCleanerDelegate that are used
|
||||
in LogsCleaner. These log cleaners are called in order, so put the log
|
||||
cleaner that prunes the most log files in the front. To implement your own
|
||||
LogCleanerDelegate, just put it in HBase's classpath and add the fully
|
||||
qualified class name here. Without special reason, you should always add
|
||||
the above default log cleaners in the list.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regions.percheckin</name>
|
||||
<value>10</value>
|
||||
|
|
|
@ -26,11 +26,13 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -38,16 +40,18 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import java.net.URLEncoder;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class TestOldLogsCleaner {
|
||||
public class TestLogsCleaner {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private ReplicationZookeeperWrapper zkHelper;
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,6 +59,7 @@ public class TestOldLogsCleaner {
|
|||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -62,6 +67,10 @@ public class TestOldLogsCleaner {
|
|||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
zkHelper = new ReplicationZookeeperWrapper(
|
||||
ZooKeeperWrapper.createInstance(conf, HRegionServer.class.getName()),
|
||||
conf, new AtomicBoolean(true), "test-cluster");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -80,23 +89,40 @@ public class TestOldLogsCleaner {
|
|||
|
||||
FileSystem fs = FileSystem.get(c);
|
||||
AtomicBoolean stop = new AtomicBoolean(false);
|
||||
OldLogsCleaner cleaner = new OldLogsCleaner(1000, stop,c, fs, oldLogDir);
|
||||
LogsCleaner cleaner = new LogsCleaner(1000, stop,c, fs, oldLogDir);
|
||||
|
||||
// Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
|
||||
long now = System.currentTimeMillis();
|
||||
fs.delete(oldLogDir, true);
|
||||
fs.mkdirs(oldLogDir);
|
||||
// Case 1: 2 invalid files, which would be deleted directly
|
||||
fs.createNewFile(new Path(oldLogDir, "a"));
|
||||
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
|
||||
// Case 2: 1 "recent" file, not even deletable for the first log cleaner
|
||||
// (TimeToLiveLogCleaner), so we are not going down the chain
|
||||
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
|
||||
System.out.println("Now is: " + now);
|
||||
for (int i = 0; i < 30; i++) {
|
||||
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now - 6000000 - i) ));
|
||||
// Case 3: old files which would be deletable for the first log cleaner
|
||||
// (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner)
|
||||
Path fileName = new Path(oldLogDir, fakeMachineName + "." +
|
||||
(now - 6000000 - i) );
|
||||
fs.createNewFile(fileName);
|
||||
// Case 4: put 3 old log files in ZK indicating that they are scheduled
|
||||
// for replication so these files would pass the first log cleaner
|
||||
// (TimeToLiveLogCleaner) but would be rejected by the second
|
||||
// (ReplicationLogCleaner)
|
||||
if (i % (30/3) == 0) {
|
||||
zkHelper.addLogToList(fileName.getName(), fakeMachineName);
|
||||
System.out.println("Replication log file: " + fileName);
|
||||
}
|
||||
}
|
||||
for (FileStatus stat : fs.listStatus(oldLogDir)) {
|
||||
System.out.println(stat.getPath().toString());
|
||||
}
|
||||
|
||||
// Case 2: 1 newer file, not even deletable for the first log cleaner
|
||||
// (TimeToLiveLogCleaner), so we are not going down the chain
|
||||
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
|
||||
|
||||
assertEquals(34, fs.listStatus(oldLogDir).length);
|
||||
|
@ -106,11 +132,17 @@ public class TestOldLogsCleaner {
|
|||
|
||||
assertEquals(14, fs.listStatus(oldLogDir).length);
|
||||
|
||||
// We will delete all remaining log files and those that are invalid
|
||||
// We will delete all remaining log files which are not scheduled for
|
||||
// replication and those that are invalid
|
||||
cleaner.chore();
|
||||
|
||||
// We end up with the current log file and a newer one
|
||||
assertEquals(2, fs.listStatus(oldLogDir).length);
|
||||
// We end up with the current log file, a newer one and the 3 old log
|
||||
// files which are scheduled for replication
|
||||
assertEquals(5, fs.listStatus(oldLogDir).length);
|
||||
|
||||
for (FileStatus file : fs.listStatus(oldLogDir)) {
|
||||
System.out.println("Keeped log files: " + file.getPath().getName());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue