HBASE-19616 Review of LogCleaner Class

This commit is contained in:
Beluga Behr 2019-02-05 10:30:15 -08:00 committed by stack
parent a33cda4b03
commit 1492bde036
2 changed files with 141 additions and 161 deletions

View File

@ -21,10 +21,11 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -36,7 +37,9 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** /**
* This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
@ -45,7 +48,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
@ -55,16 +58,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
@VisibleForTesting @VisibleForTesting
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
"hbase.oldwals.cleaner.thread.check.interval.msec";
@VisibleForTesting
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;
private final LinkedBlockingQueue<CleanerContext> pendingDelete; private final LinkedBlockingQueue<CleanerContext> pendingDelete;
private List<Thread> oldWALsCleaner; private List<Thread> oldWALsCleaner;
private long cleanerThreadTimeoutMsec; private long cleanerThreadTimeoutMsec;
private long cleanerThreadCheckIntervalMsec;
/** /**
* @param period the period of time to sleep between each run * @param period the period of time to sleep between each run
@ -81,8 +77,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
this.oldWALsCleaner = createOldWalsCleaner(size); this.oldWALsCleaner = createOldWalsCleaner(size);
this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
} }
@Override @Override
@ -97,35 +91,33 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
if (newSize == oldWALsCleaner.size()) { if (newSize == oldWALsCleaner.size()) {
if (LOG.isDebugEnabled()) { LOG.debug("Size from configuration is the same as previous which "
LOG.debug("Size from configuration is the same as previous which is " + + "is {}, no need to update.", newSize);
newSize + ", no need to update.");
}
return; return;
} }
interruptOldWALsCleaner(); interruptOldWALsCleaner();
oldWALsCleaner = createOldWalsCleaner(newSize); oldWALsCleaner = createOldWalsCleaner(newSize);
cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
} }
@Override @Override
protected int deleteFiles(Iterable<FileStatus> filesToDelete) { protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
List<CleanerContext> results = new LinkedList<>(); List<CleanerContext> results = new ArrayList<>();
for (FileStatus toDelete : filesToDelete) { for (FileStatus file : filesToDelete) {
CleanerContext context = CleanerContext.createCleanerContext(toDelete, LOG.trace("Scheduling file {} for deletion", file);
cleanerThreadTimeoutMsec); if (file != null) {
if (context != null) { results.add(new CleanerContext(file));
pendingDelete.add(context);
results.add(context);
} }
} }
LOG.debug("Old WAL files pending deletion: {}", results);
pendingDelete.addAll(results);
int deletedFiles = 0; int deletedFiles = 0;
for (CleanerContext res : results) { for (CleanerContext res : results) {
deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
} }
return deletedFiles; return deletedFiles;
} }
@ -146,13 +138,8 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
return cleanerThreadTimeoutMsec; return cleanerThreadTimeoutMsec;
} }
@VisibleForTesting
long getCleanerThreadCheckIntervalMsec() {
return cleanerThreadCheckIntervalMsec;
}
private List<Thread> createOldWalsCleaner(int size) { private List<Thread> createOldWalsCleaner(int size) {
LOG.info("Creating OldWALs cleaners with size=" + size); LOG.info("Creating {} OldWALs cleaner threads", size);
List<Thread> oldWALsCleaner = new ArrayList<>(size); List<Thread> oldWALsCleaner = new ArrayList<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -167,6 +154,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private void interruptOldWALsCleaner() { private void interruptOldWALsCleaner() {
for (Thread cleaner : oldWALsCleaner) { for (Thread cleaner : oldWALsCleaner) {
LOG.trace("Interrupting thread: {}", cleaner);
cleaner.interrupt(); cleaner.interrupt();
} }
oldWALsCleaner.clear(); oldWALsCleaner.clear();
@ -174,95 +162,78 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private void deleteFile() { private void deleteFile() {
while (true) { while (true) {
CleanerContext context = null;
boolean succeed = false;
boolean interrupted = false;
try { try {
context = pendingDelete.take(); final CleanerContext context = pendingDelete.take();
if (context != null) { Preconditions.checkNotNull(context);
FileStatus toClean = context.getTargetToClean(); FileStatus oldWalFile = context.getTargetToClean();
succeed = this.fs.delete(toClean.getPath(), false); try {
} LOG.debug("Attempting to delete old WAL file: {}", oldWalFile);
} catch (InterruptedException ite) { boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
// It's most likely from configuration changing request context.setResult(succeed);
if (context != null) {
LOG.warn("Interrupted while cleaning oldWALs " +
context.getTargetToClean() + ", try to clean it next round.");
}
interrupted = true;
} catch (IOException e) { } catch (IOException e) {
// fs.delete() fails. // fs.delete() fails.
LOG.warn("Failed to clean oldwals with exception: " + e); LOG.warn("Failed to clean old WAL file", e);
succeed = false; context.setResult(false);
} finally {
if (context != null) {
context.setResult(succeed);
} }
if (interrupted) { } catch (InterruptedException ite) {
// It is most likely from configuration changing request
LOG.warn("Interrupted while cleaning old WALs, will "
+ "try to clean it next round. Exiting.");
// Restore interrupt status // Restore interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; return;
} }
} LOG.debug("Exiting");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Exiting cleaner.");
} }
} }
@Override @Override
public synchronized void cancel(boolean mayInterruptIfRunning) { public synchronized void cancel(boolean mayInterruptIfRunning) {
LOG.debug("Cancelling LogCleaner");
super.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning);
for (Thread t : oldWALsCleaner) { interruptOldWALsCleaner();
t.interrupt();
}
} }
private static final class CleanerContext { private static final class CleanerContext {
final FileStatus target; final FileStatus target;
volatile boolean result; final AtomicBoolean result;
volatile boolean setFromCleaner = false; final CountDownLatch remainingResults;
long timeoutMsec;
static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { private CleanerContext(FileStatus status) {
return status != null ? new CleanerContext(status, timeoutMsec) : null;
}
private CleanerContext(FileStatus status, long timeoutMsec) {
this.target = status; this.target = status;
this.result = false; this.result = new AtomicBoolean(false);
this.timeoutMsec = timeoutMsec; this.remainingResults = new CountDownLatch(1);
} }
synchronized void setResult(boolean res) { void setResult(boolean res) {
this.result = res; this.result.set(res);
this.setFromCleaner = true; this.remainingResults.countDown();
notify();
} }
synchronized boolean getResult(long waitIfNotFinished) { boolean getResult(long waitIfNotFinished) {
long totalTimeMsec = 0;
try { try {
while (!setFromCleaner) { boolean completed = this.remainingResults.await(waitIfNotFinished,
long startTimeNanos = System.nanoTime(); TimeUnit.MILLISECONDS);
wait(waitIfNotFinished); if (!completed) {
totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}",
TimeUnit.NANOSECONDS); waitIfNotFinished, target);
if (totalTimeMsec >= timeoutMsec) { return false;
LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target);
return result;
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted while waiting deletion of " + target); LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
return result; return false;
} }
return result; return result.get();
} }
FileStatus getTargetToClean() { FileStatus getTargetToClean() {
return target; return target;
} }
@Override
public String toString() {
return "CleanerContext [target=" + target + ", result=" + result + "]";
}
} }
} }

View File

@ -20,14 +20,21 @@ package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -46,6 +53,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
@ -55,15 +63,16 @@ import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, MediumTests.class})
public class TestLogsCleaner { public class TestLogsCleaner {
@ -74,6 +83,14 @@ public class TestLogsCleaner {
private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path OLD_WALS_DIR =
new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
private static final Path OLD_PROCEDURE_WALS_DIR =
new Path(OLD_WALS_DIR, "masterProcedureWALs");
private static Configuration conf;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
@ -87,6 +104,18 @@ public class TestLogsCleaner {
TEST_UTIL.shutdownMiniDFSCluster(); TEST_UTIL.shutdownMiniDFSCluster();
} }
@Before
public void beforeTest() throws IOException {
conf = TEST_UTIL.getConfiguration();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
fs.delete(OLD_WALS_DIR, true);
// root directory
fs.mkdirs(OLD_WALS_DIR);
}
/** /**
* This tests verifies LogCleaner works correctly with WALs and Procedure WALs located * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
* in the same oldWALs directory. * in the same oldWALs directory.
@ -106,7 +135,6 @@ public class TestLogsCleaner {
*/ */
@Test @Test
public void testLogCleaning() throws Exception { public void testLogCleaning() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// set TTLs // set TTLs
long ttlWAL = 2000; long ttlWAL = 2000;
long ttlProcedureWAL = 4000; long ttlProcedureWAL = 4000;
@ -117,23 +145,23 @@ public class TestLogsCleaner {
Server server = new DummyServer(); Server server = new DummyServer();
ReplicationQueueStorage queueStorage = ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); String fakeMachineName = URLEncoder.encode(
String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); server.getServerName().toString(), StandardCharsets.UTF_8.name());
final FileSystem fs = FileSystem.get(conf); final FileSystem fs = FileSystem.get(conf);
fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
fs.delete(oldLogDir, true);
fs.mkdirs(oldLogDir);
// Case 1: 2 invalid files, which would be deleted directly // Case 1: 2 invalid files, which would be deleted directly
fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
// Case 2: 5 Procedure WALs that are old which would be deleted // Case 2: 5 Procedure WALs that are old which would be deleted
for (int i = 1; i < 6; i++) { for (int i = 1; i <= 5; i++) {
Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); final Path fileName =
new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
fs.createNewFile(fileName); fs.createNewFile(fileName);
} }
@ -141,56 +169,57 @@ public class TestLogsCleaner {
Thread.sleep(ttlProcedureWAL - ttlWAL); Thread.sleep(ttlProcedureWAL - ttlWAL);
// Case 3: old WALs which would be deletable // Case 3: old WALs which would be deletable
for (int i = 1; i < 31; i++) { for (int i = 1; i <= 30; i++) {
Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i)); Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
fs.createNewFile(fileName); fs.createNewFile(fileName);
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
if (i % (30 / 3) == 1) { if (i % (30 / 3) == 0) {
queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
LOG.info("Replication log file: " + fileName); LOG.info("Replication log file: " + fileName);
} }
} }
// Case 5: 5 Procedure WALs that are new, will stay // Case 5: 5 Procedure WALs that are new, will stay
for (int i = 6; i < 11; i++) { for (int i = 6; i <= 10; i++) {
Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); Path fileName =
new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
fs.createNewFile(fileName); fs.createNewFile(fileName);
} }
// Sleep for sometime to get newer modification time // Sleep for sometime to get newer modification time
Thread.sleep(ttlWAL); Thread.sleep(ttlWAL);
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
// Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
// so we are not going down the chain // so we are not going down the chain
fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL))); fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
for (FileStatus stat : fs.listStatus(oldLogDir)) { FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
LOG.info(stat.getPath().toString()); LOG.info("File status: {}", Arrays.toString(status));
}
// There should be 34 files and masterProcedureWALs directory // There should be 34 files and 1 masterProcedureWALs directory
assertEquals(35, fs.listStatus(oldLogDir).length); assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
// 10 procedure WALs // 10 procedure WALs
assertEquals(10, fs.listStatus(oldProcedureWALDir).length); assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR);
cleaner.chore(); cleaner.chore();
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
// are scheduled for replication and masterProcedureWALs directory // are scheduled for replication and masterProcedureWALs directory
TEST_UTIL.waitFor(1000, TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 6 == fs
(Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length); .listStatus(OLD_WALS_DIR).length);
// In masterProcedureWALs we end up with 5 newer Procedure WALs // In masterProcedureWALs we end up with 5 newer Procedure WALs
TEST_UTIL.waitFor(1000, TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 5 == fs
(Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length); .listStatus(OLD_PROCEDURE_WALS_DIR).length);
for (FileStatus file : fs.listStatus(oldLogDir)) { if (LOG.isDebugEnabled()) {
LOG.debug("Kept log file in oldWALs: " + file.getPath().getName()); FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
} FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
for (FileStatus file : fs.listStatus(oldProcedureWALDir)) { LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName()); LOG.debug("Kept log file for masterProcedureWALs: {}",
Arrays.toString(statusProcedureWALs));
} }
} }
@ -202,7 +231,7 @@ public class TestLogsCleaner {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Lists.newArrayList( List<FileStatus> dummyFiles = Arrays.asList(
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
); );
@ -235,14 +264,13 @@ public class TestLogsCleaner {
/** /**
* When zk is working both files should be returned * When zk is working both files should be returned
* @throws Exception * @throws Exception from ZK watcher
*/ */
@Test(timeout=10000) @Test(timeout=10000)
public void testZooKeeperNormal() throws Exception { public void testZooKeeperNormal() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Lists.newArrayList( List<FileStatus> dummyFiles = Arrays.asList(
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
); );
@ -265,30 +293,18 @@ public class TestLogsCleaner {
@Test @Test
public void testOnConfigurationChange() throws Exception { public void testOnConfigurationChange() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE,
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
// Prepare environments // Prepare environments
Server server = new DummyServer(); Server server = new DummyServer();
Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR);
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners());
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec()); cleaner.getCleanerThreadTimeoutMsec());
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
cleaner.getCleanerThreadCheckIntervalMsec());
// Create dir and files for test // Create dir and files for test
fs.delete(oldWALsDir, true);
fs.mkdirs(oldWALsDir);
int numOfFiles = 10; int numOfFiles = 10;
createFiles(fs, oldWALsDir, numOfFiles); createFiles(fs, OLD_WALS_DIR, numOfFiles);
FileStatus[] status = fs.listStatus(oldWALsDir); FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
assertEquals(numOfFiles, status.length); assertEquals(numOfFiles, status.length);
// Start cleaner chore // Start cleaner chore
Thread thread = new Thread(() -> cleaner.chore()); Thread thread = new Thread(() -> cleaner.chore());
@ -297,34 +313,27 @@ public class TestLogsCleaner {
// change size of cleaners dynamically // change size of cleaners dynamically
int sizeToChange = 4; int sizeToChange = 4;
long threadTimeoutToChange = 30 * 1000L; long threadTimeoutToChange = 30 * 1000L;
long threadCheckIntervalToChange = 250L;
conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange);
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
threadCheckIntervalToChange);
cleaner.onConfigurationChange(conf); cleaner.onConfigurationChange(conf);
assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec());
// Stop chore // Stop chore
thread.join(); thread.join();
status = fs.listStatus(oldWALsDir); status = fs.listStatus(OLD_WALS_DIR);
assertEquals(0, status.length); assertEquals(0, status.length);
} }
private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
Random random = new Random();
for (int i = 0; i < numOfFiles; i++) { for (int i = 0; i < numOfFiles; i++) {
int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M // size of each file is 1M, 2M, or 3M
int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
for (int m = 0; m < xMega; m++) { byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega));
byte[] M = new byte[1024 * 1024];
random.nextBytes(M);
fsdos.write(M); fsdos.write(M);
} }
} }
} }
}
static class DummyServer implements Server { static class DummyServer implements Server {