HBASE-18108 Procedure WALs are archived but not cleaned; fix

The archived Procedure WALs are moved to <hbase_root>/oldWALs/masterProcedureWALs
directory. TimeToLiveProcedureWALCleaner class was added which
regularly cleans the Procedure WAL files from there.

The TimeToLiveProcedureWALCleaner is now added to
hbase.master.logcleaner.plugins to clean the 2 WALs in one run.

A new config parameter is added hbase.master.procedurewalcleaner.ttl
which specifies how long a Procedure WAL should stay in the
archive directory.

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Peter Somogyi 2017-10-05 15:44:24 -07:00 committed by Michael Stack
parent 19336cadce
commit 507a3f9425
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
9 changed files with 218 additions and 88 deletions

View File

@ -129,7 +129,7 @@ possible configurations would overwhelm and obscure the important.
</property> </property>
<property> <property>
<name>hbase.master.logcleaner.plugins</name> <name>hbase.master.logcleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner</value> <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner</value>
<description>A comma-separated list of BaseLogCleanerDelegate invoked by <description>A comma-separated list of BaseLogCleanerDelegate invoked by
the LogsCleaner service. These WAL cleaners are called in order, the LogsCleaner service. These WAL cleaners are called in order,
so put the cleaner that prunes the most files in front. To so put the cleaner that prunes the most files in front. To
@ -140,8 +140,15 @@ possible configurations would overwhelm and obscure the important.
<property> <property>
<name>hbase.master.logcleaner.ttl</name> <name>hbase.master.logcleaner.ttl</name>
<value>600000</value> <value>600000</value>
<description>Maximum time a WAL can stay in the .oldlogdir directory, <description>How long a WAL remain in the {hbase.rootdir}/oldWALs directory,
after which it will be cleaned by a Master thread.</description> after which it will be cleaned by a Master thread. The value is in milliseconds.</description>
</property>
<property>
<name>hbase.master.procedurewalcleaner.ttl</name>
<value>604800000</value>
<description>How long a Procedure WAL stays will remain in the
{hbase.rootdir}/oldWALs/masterProcedureWALs directory, after which it will be cleaned
by a Master thread. The value is in milliseconds.</description>
</property> </property>
<property> <property>
<name>hbase.master.hfilecleaner.plugins</name> <name>hbase.master.hfilecleaner.plugins</name>

View File

@ -1198,10 +1198,10 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException { private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path rootDir = FSUtils.getRootDir(conf);
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
// TODO: No cleaner currently! Make it a subdir! final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path walArchiveDir = new Path(walDir, "archive");
final FileSystem walFs = walDir.getFileSystem(conf); final FileSystem walFs = walDir.getFileSystem(conf);

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.BaseConfigurable; import org.apache.hadoop.hbase.BaseConfigurable;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import java.util.Map; import java.util.Map;
@ -34,11 +33,7 @@ implements FileCleanerDelegate {
@Override @Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return Iterables.filter(files, new Predicate<FileStatus>() { return Iterables.filter(files, this::isFileDeletable);
@Override
public boolean apply(FileStatus file) {
return isFileDeletable(file);
}});
} }
@Override @Override

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -114,7 +113,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* A utility method to create new instances of LogCleanerDelegate based on the class name of the * A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate. * LogCleanerDelegate.
* @param className fully qualified class name of the LogCleanerDelegate * @param className fully qualified class name of the LogCleanerDelegate
* @param conf * @param conf used configuration
* @return the new instance * @return the new instance
*/ */
private T newFileCleaner(String className, Configuration conf) { private T newFileCleaner(String className, Configuration conf) {
@ -164,15 +163,14 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
// no need to sort for empty or single directory // no need to sort for empty or single directory
return; return;
} }
Collections.sort(dirs, new Comparator<FileStatus>() { dirs.sort(new Comparator<FileStatus>() {
HashMap<FileStatus, Long> directorySpaces = new HashMap<FileStatus, Long>(); HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
@Override @Override
public int compare(FileStatus f1, FileStatus f2) { public int compare(FileStatus f1, FileStatus f2) {
long f1ConsumedSpace = getSpace(f1); long f1ConsumedSpace = getSpace(f1);
long f2ConsumedSpace = getSpace(f2); long f2ConsumedSpace = getSpace(f2);
return (f1ConsumedSpace > f2ConsumedSpace) ? -1 return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
: (f1ConsumedSpace < f2ConsumedSpace ? 1 : 0);
} }
private long getSpace(FileStatus f) { private long getSpace(FileStatus f) {

View File

@ -25,12 +25,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
/** /**
* This Chore, every time it runs, will attempt to delete the WALs in the old logs folder. The WAL * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
* is only deleted if none of the cleaner delegates says otherwise. * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
* @see BaseLogCleanerDelegate * @see BaseLogCleanerDelegate
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -38,19 +39,20 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName()); private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
/** /**
* @param p the period of time to sleep between each run * @param period the period of time to sleep between each run
* @param s the stopper * @param stopper the stopper
* @param conf configuration to use * @param conf configuration to use
* @param fs handle to the FS * @param fs handle to the FS
* @param oldLogDir the path to the archived logs * @param oldLogDir the path to the archived logs
*/ */
public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs, public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir) { Path oldLogDir) {
super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
} }
@Override @Override
protected boolean validate(Path file) { protected boolean validate(Path file) {
return AbstractFSWALProvider.validateWALFilename(file.getName()); return AbstractFSWALProvider.validateWALFilename(file.getName())
|| MasterProcedureUtil.validateProcedureWALFilename(file.getName());
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -32,16 +33,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate { public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
private static final Log LOG = LogFactory.getLog(TimeToLiveLogCleaner.class.getName()); private static final Log LOG = LogFactory.getLog(TimeToLiveLogCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.logcleaner.ttl";
// default ttl = 10 minutes
public static final long DEFAULT_TTL = 600_000L;
// Configured time a log can be kept after it was closed // Configured time a log can be kept after it was closed
private long ttl; private long ttl;
private boolean stopped = false; private boolean stopped = false;
@Override @Override
public boolean isLogDeletable(FileStatus fStat) { public boolean isLogDeletable(FileStatus fStat) {
// Files are validated for the second time here,
// if it causes a bottleneck this logic needs refactored
if (!AbstractFSWALProvider.validateWALFilename(fStat.getPath().getName())) {
return true;
}
long currentTime = EnvironmentEdgeManager.currentTime(); long currentTime = EnvironmentEdgeManager.currentTime();
long time = fStat.getModificationTime(); long time = fStat.getModificationTime();
long life = currentTime - time; long life = currentTime - time;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: " LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
+ time); + time);
@ -57,10 +67,9 @@ public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
super.setConf(conf); super.setConf(conf);
this.ttl = conf.getLong("hbase.master.logcleaner.ttl", 600000); this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
} }
@Override @Override
public void stop(String why) { public void stop(String why) {
this.stopped = true; this.stopped = true;

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Procedure WAL cleaner that uses the timestamp of the Procedure WAL to determine if it should be
* deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class TimeToLiveProcedureWALCleaner extends BaseFileCleanerDelegate {
private static final Log LOG = LogFactory.getLog(TimeToLiveProcedureWALCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.procedurewalcleaner.ttl";
// default ttl = 7 days
public static final long DEFAULT_TTL = 604_800_000L;
// Configured time a procedure log can be kept after it was moved to the archive
private long ttl;
private boolean stopped = false;
@Override
public void setConf(Configuration conf) {
this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
super.setConf(conf);
}
@Override
public boolean isFileDeletable(FileStatus fStat) {
// Files are validated for the second time here,
// if it causes a bottleneck this logic needs refactored
if (!MasterProcedureUtil.validateProcedureWALFilename(fStat.getPath().getName())) {
return true;
}
long currentTime = EnvironmentEdgeManager.currentTime();
long time = fStat.getModificationTime();
long life = currentTime - time;
if (LOG.isTraceEnabled()) {
LOG.trace("Procedure log life:" + life + ", ttl:" + ttl + ", current:" + currentTime +
", from: " + time);
}
if (life < 0) {
LOG.warn("Found a procedure log (" + fStat.getPath() + ") newer than current time ("
+ currentTime + " < " + time + "), probably a clock skew");
return false;
}
return life > ttl;
}
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -141,4 +142,19 @@ public final class MasterProcedureUtil {
} }
return runnable.getProcId(); return runnable.getProcId();
} }
/**
* Pattern used to validate a Procedure WAL file name see
* {@link #validateProcedureWALFilename(String)} for description.
*/
private static final Pattern pattern = Pattern.compile(".*pv-\\d{20}.log");
/**
* A Procedure WAL file name is of the format: pv-&lt;wal-id&gt;.log where wal-id is 20 digits.
* @param filename name of the file to validate
* @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
*/
public static boolean validateProcedureWALFilename(String filename) {
return pattern.matcher(filename).matches();
}
} }

View File

@ -30,6 +30,8 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -48,7 +50,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -68,96 +69,124 @@ import org.mockito.Mockito;
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, MediumTests.class})
public class TestLogsCleaner { public class TestLogsCleaner {
private static final Log LOG = LogFactory.getLog(TestLogsCleaner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/**
* @throws java.lang.Exception
*/
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
} }
/**
* @throws java.lang.Exception
*/
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniZKCluster();
} }
/**
* This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
* in the same oldWALs directory.
* Created files:
* - 2 invalid files
* - 5 old Procedure WALs
* - 30 old WALs from which 3 are in replication
* - 5 recent Procedure WALs
* - 1 recent WAL
* - 1 very new WAL (timestamp in future)
* - masterProcedureWALs subdirectory
* Files which should stay:
* - 3 replication WALs
* - 2 new WALs
* - 5 latest Procedure WALs
* - masterProcedureWALs subdirectory
*/
@Test @Test
public void testLogCleaning() throws Exception{ public void testLogCleaning() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
// set TTL // set TTLs
long ttl = 10000; long ttlWAL = 2000;
conf.setLong("hbase.master.logcleaner.ttl", ttl); long ttlProcedureWAL = 4000;
conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
Replication.decorateMasterConfiguration(conf); Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer(); Server server = new DummyServer();
ReplicationQueues repQueues = ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
repQueues.init(server.getServerName().toString()); repQueues.init(server.getServerName().toString());
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
HConstants.HREGION_OLDLOGDIR_NAME); final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
String fakeMachineName = String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
URLEncoder.encode(server.getServerName().toString(), "UTF8");
final FileSystem fs = FileSystem.get(conf); final FileSystem fs = FileSystem.get(conf);
// Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
fs.delete(oldLogDir, true); fs.delete(oldLogDir, true);
fs.mkdirs(oldLogDir); fs.mkdirs(oldLogDir);
// Case 1: 2 invalid files, which would be deleted directly // Case 1: 2 invalid files, which would be deleted directly
fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(oldLogDir, "a"));
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
// Case 2: 1 "recent" file, not even deletable for the first log cleaner
// (TimeToLiveLogCleaner), so we are not going down the chain // Case 2: 5 Procedure WALs that are old which would be deleted
System.out.println("Now is: " + now); for (int i = 1; i < 6; i++) {
for (int i = 1; i < 31; i++) { Path fileName = new Path(oldProcedureWALDir, String.format("pv-%020d.log", i));
// Case 3: old files which would be deletable for the first log cleaner
// (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner)
Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
fs.createNewFile(fileName); fs.createNewFile(fileName);
// Case 4: put 3 old log files in ZK indicating that they are scheduled }
// for replication so these files would pass the first log cleaner
// (TimeToLiveLogCleaner) but would be rejected by the second // Sleep for sometime to get old procedure WALs
// (ReplicationLogCleaner) Thread.sleep(ttlProcedureWAL - ttlWAL);
if (i % (30/3) == 1) {
// Case 3: old WALs which would be deletable
for (int i = 1; i < 31; i++) {
Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i));
fs.createNewFile(fileName);
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
if (i % (30 / 3) == 1) {
repQueues.addLog(fakeMachineName, fileName.getName()); repQueues.addLog(fakeMachineName, fileName.getName());
System.out.println("Replication log file: " + fileName); LOG.info("Replication log file: " + fileName);
} }
} }
// sleep for sometime to get newer modifcation time // Case 5: 5 Procedure WALs that are new, will stay
Thread.sleep(ttl); for (int i = 6; i < 11; i++) {
Path fileName = new Path(oldProcedureWALDir, String.format("pv-%020d.log", i));
fs.createNewFile(fileName);
}
// Sleep for sometime to get newer modification time
Thread.sleep(ttlWAL);
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
// Case 2: 1 newer file, not even deletable for the first log cleaner // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
// (TimeToLiveLogCleaner), so we are not going down the chain // so we are not going down the chain
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) )); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL)));
for (FileStatus stat : fs.listStatus(oldLogDir)) { for (FileStatus stat : fs.listStatus(oldLogDir)) {
System.out.println(stat.getPath().toString()); LOG.info(stat.getPath().toString());
} }
assertEquals(34, fs.listStatus(oldLogDir).length); // There should be 34 files and masterProcedureWALs directory
assertEquals(35, fs.listStatus(oldLogDir).length);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); // 10 procedure WALs
assertEquals(10, fs.listStatus(oldProcedureWALDir).length);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
cleaner.chore(); cleaner.chore();
// We end up with the current log file, a newer one and the 3 old log // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
// files which are scheduled for replication // are scheduled for replication and masterProcedureWALs directory
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { TEST_UTIL.waitFor(1000,
@Override (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length);
public boolean evaluate() throws Exception { // In masterProcedureWALs we end up with 5 newer Procedure WALs
return 5 == fs.listStatus(oldLogDir).length; TEST_UTIL.waitFor(1000,
} (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length);
});
for (FileStatus file : fs.listStatus(oldLogDir)) { for (FileStatus file : fs.listStatus(oldLogDir)) {
System.out.println("Kept log files: " + file.getPath().getName()); LOG.debug("Kept log file in oldWALs: " + file.getPath().getName());
}
for (FileStatus file : fs.listStatus(oldProcedureWALDir)) {
LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName());
} }
} }
@ -180,8 +209,7 @@ public class TestLogsCleaner {
} }
/** /**
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
* aborting.
*/ */
@Test @Test
public void testZooKeeperAbort() throws Exception { public void testZooKeeperAbort() throws Exception {
@ -193,23 +221,19 @@ public class TestLogsCleaner {
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
); );
FaultyZooKeeperWatcher faultyZK = try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf,
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); "testZooKeeperAbort-faulty", null)) {
try {
faultyZK.init(); faultyZK.init();
cleaner.setConf(conf, faultyZK); cleaner.setConf(conf, faultyZK);
// should keep all files due to a ConnectionLossException getting the queues znodes // should keep all files due to a ConnectionLossException getting the queues znodes
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertFalse(toDelete.iterator().hasNext()); assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped()); assertFalse(cleaner.isStopped());
} finally {
faultyZK.close();
} }
// when zk is working both files should be returned // when zk is working both files should be returned
cleaner = new ReplicationLogCleaner(); cleaner = new ReplicationLogCleaner();
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null); try (ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null)) {
try {
cleaner.setConf(conf, zkw); cleaner.setConf(conf, zkw);
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator(); Iterator<FileStatus> iter = filesToDelete.iterator();
@ -218,8 +242,6 @@ public class TestLogsCleaner {
assertTrue(iter.hasNext()); assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath()); assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext()); assertFalse(iter.hasNext());
} finally {
zkw.close();
} }
} }
@ -283,7 +305,6 @@ public class TestLogsCleaner {
@Override @Override
public ClusterConnection getClusterConnection() { public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null; return null;
} }
} }