diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index db098e26b47..a7338c06986 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -21,10 +21,11 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -36,7 +37,9 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old @@ -45,7 +48,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti */ @InterfaceAudience.Private public class LogCleaner extends CleanerChore { - private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; @@ -55,16 +58,9 @@ public class LogCleaner extends CleanerChore { @VisibleForTesting static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; - public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = - "hbase.oldwals.cleaner.thread.check.interval.msec"; - @VisibleForTesting - static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L; - - private final LinkedBlockingQueue pendingDelete; private List oldWALsCleaner; private long cleanerThreadTimeoutMsec; - private long cleanerThreadCheckIntervalMsec; /** * @param period the period of time to sleep between each run @@ -81,8 +77,6 @@ public class LogCleaner extends CleanerChore { this.oldWALsCleaner = createOldWalsCleaner(size); this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); - this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override @@ -97,35 +91,33 @@ public class LogCleaner extends CleanerChore { int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Size from configuration is the same as previous which is " + - newSize + ", no need to update."); - } + LOG.debug("Size from configuration is the same as previous which " + + "is {}, no need to update.", newSize); return; } interruptOldWALsCleaner(); oldWALsCleaner = createOldWalsCleaner(newSize); cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); - cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override protected int deleteFiles(Iterable filesToDelete) { - List results = new LinkedList<>(); - for (FileStatus toDelete : filesToDelete) { - CleanerContext context = CleanerContext.createCleanerContext(toDelete, - cleanerThreadTimeoutMsec); - if (context != null) { - pendingDelete.add(context); - results.add(context); + List results = new ArrayList<>(); + for (FileStatus file : filesToDelete) { + LOG.trace("Scheduling file {} for deletion", file); + if (file != null) { + results.add(new CleanerContext(file)); } } + LOG.debug("Old WAL files pending deletion: {}", results); + pendingDelete.addAll(results); + int deletedFiles = 0; for (CleanerContext res : results) { - deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; + LOG.trace("Awaiting the results for deletion of old WAL file: {}", res); + deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0; } return deletedFiles; } @@ -146,13 +138,8 @@ public class LogCleaner extends CleanerChore { return cleanerThreadTimeoutMsec; } - @VisibleForTesting - long getCleanerThreadCheckIntervalMsec() { - return cleanerThreadCheckIntervalMsec; - } - private List createOldWalsCleaner(int size) { - LOG.info("Creating OldWALs cleaners with size=" + size); + LOG.info("Creating {} OldWALs cleaner threads", size); List oldWALsCleaner = new ArrayList<>(size); for (int i = 0; i < size; i++) { @@ -167,6 +154,7 @@ public class LogCleaner extends CleanerChore { private void interruptOldWALsCleaner() { for (Thread cleaner : oldWALsCleaner) { + LOG.trace("Interrupting thread: {}", cleaner); cleaner.interrupt(); } oldWALsCleaner.clear(); @@ -174,95 +162,78 @@ public class LogCleaner extends CleanerChore { private void deleteFile() { while (true) { - CleanerContext context = null; - boolean succeed = false; - boolean interrupted = false; try { - context = pendingDelete.take(); - if (context != null) { - FileStatus toClean = context.getTargetToClean(); - succeed = this.fs.delete(toClean.getPath(), false); + final CleanerContext context = pendingDelete.take(); + Preconditions.checkNotNull(context); + FileStatus oldWalFile = context.getTargetToClean(); + try { + LOG.debug("Attempting to delete old WAL file: {}", oldWalFile); + boolean succeed = this.fs.delete(oldWalFile.getPath(), false); + context.setResult(succeed); + } catch (IOException e) { + // fs.delete() fails. + LOG.warn("Failed to clean old WAL file", e); + context.setResult(false); } } catch (InterruptedException ite) { - // It's most likely from configuration changing request - if (context != null) { - LOG.warn("Interrupted while cleaning oldWALs " + - context.getTargetToClean() + ", try to clean it next round."); - } - interrupted = true; - } catch (IOException e) { - // fs.delete() fails. - LOG.warn("Failed to clean oldwals with exception: " + e); - succeed = false; - } finally { - if (context != null) { - context.setResult(succeed); - } - if (interrupted) { - // Restore interrupt status - Thread.currentThread().interrupt(); - break; - } + // It is most likely from configuration changing request + LOG.warn("Interrupted while cleaning old WALs, will " + + "try to clean it next round. Exiting."); + // Restore interrupt status + Thread.currentThread().interrupt(); + return; } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Exiting cleaner."); + LOG.debug("Exiting"); } } @Override public synchronized void cancel(boolean mayInterruptIfRunning) { + LOG.debug("Cancelling LogCleaner"); super.cancel(mayInterruptIfRunning); - for (Thread t : oldWALsCleaner) { - t.interrupt(); - } + interruptOldWALsCleaner(); } private static final class CleanerContext { final FileStatus target; - volatile boolean result; - volatile boolean setFromCleaner = false; - long timeoutMsec; + final AtomicBoolean result; + final CountDownLatch remainingResults; - static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { - return status != null ? new CleanerContext(status, timeoutMsec) : null; - } - - private CleanerContext(FileStatus status, long timeoutMsec) { + private CleanerContext(FileStatus status) { this.target = status; - this.result = false; - this.timeoutMsec = timeoutMsec; + this.result = new AtomicBoolean(false); + this.remainingResults = new CountDownLatch(1); } - synchronized void setResult(boolean res) { - this.result = res; - this.setFromCleaner = true; - notify(); + void setResult(boolean res) { + this.result.set(res); + this.remainingResults.countDown(); } - synchronized boolean getResult(long waitIfNotFinished) { - long totalTimeMsec = 0; + boolean getResult(long waitIfNotFinished) { try { - while (!setFromCleaner) { - long startTimeNanos = System.nanoTime(); - wait(waitIfNotFinished); - totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, - TimeUnit.NANOSECONDS); - if (totalTimeMsec >= timeoutMsec) { - LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target); - return result; - } + boolean completed = this.remainingResults.await(waitIfNotFinished, + TimeUnit.MILLISECONDS); + if (!completed) { + LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}", + waitIfNotFinished, target); + return false; } } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting deletion of " + target); - return result; + LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target); + return false; } - return result; + return result.get(); } FileStatus getTargetToClean() { return target; } + + @Override + public String toString() { + return "CleanerContext [target=" + target + ", result=" + result + "]"; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d1dad624d64..96374771e75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -20,14 +20,21 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +53,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; @@ -55,15 +63,16 @@ import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category({MasterTests.class, MediumTests.class}) public class TestLogsCleaner { @@ -74,6 +83,14 @@ public class TestLogsCleaner { private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Path OLD_WALS_DIR = + new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); + + private static final Path OLD_PROCEDURE_WALS_DIR = + new Path(OLD_WALS_DIR, "masterProcedureWALs"); + + private static Configuration conf; + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); @@ -87,6 +104,18 @@ public class TestLogsCleaner { TEST_UTIL.shutdownMiniDFSCluster(); } + @Before + public void beforeTest() throws IOException { + conf = TEST_UTIL.getConfiguration(); + + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + + fs.delete(OLD_WALS_DIR, true); + + // root directory + fs.mkdirs(OLD_WALS_DIR); + } + /** * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located * in the same oldWALs directory. @@ -106,7 +135,6 @@ public class TestLogsCleaner { */ @Test public void testLogCleaning() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); // set TTLs long ttlWAL = 2000; long ttlProcedureWAL = 4000; @@ -117,23 +145,23 @@ public class TestLogsCleaner { Server server = new DummyServer(); ReplicationQueueStorage queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); - final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); - final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); - String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); + + String fakeMachineName = URLEncoder.encode( + server.getServerName().toString(), StandardCharsets.UTF_8.name()); final FileSystem fs = FileSystem.get(conf); + fs.mkdirs(OLD_PROCEDURE_WALS_DIR); - long now = System.currentTimeMillis(); - fs.delete(oldLogDir, true); - fs.mkdirs(oldLogDir); + final long now = System.currentTimeMillis(); // Case 1: 2 invalid files, which would be deleted directly - fs.createNewFile(new Path(oldLogDir, "a")); - fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); + fs.createNewFile(new Path(OLD_WALS_DIR, "a")); + fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); // Case 2: 5 Procedure WALs that are old which would be deleted - for (int i = 1; i < 6; i++) { - Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); + for (int i = 1; i <= 5; i++) { + final Path fileName = + new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); fs.createNewFile(fileName); } @@ -141,56 +169,57 @@ public class TestLogsCleaner { Thread.sleep(ttlProcedureWAL - ttlWAL); // Case 3: old WALs which would be deletable - for (int i = 1; i < 31; i++) { - Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i)); + for (int i = 1; i <= 30; i++) { + Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); fs.createNewFile(fileName); // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner - if (i % (30 / 3) == 1) { + if (i % (30 / 3) == 0) { queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); LOG.info("Replication log file: " + fileName); } } // Case 5: 5 Procedure WALs that are new, will stay - for (int i = 6; i < 11; i++) { - Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); + for (int i = 6; i <= 10; i++) { + Path fileName = + new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); fs.createNewFile(fileName); } // Sleep for sometime to get newer modification time Thread.sleep(ttlWAL); - fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); + fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, // so we are not going down the chain - fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL))); + fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); - for (FileStatus stat : fs.listStatus(oldLogDir)) { - LOG.info(stat.getPath().toString()); - } + FileStatus[] status = fs.listStatus(OLD_WALS_DIR); + LOG.info("File status: {}", Arrays.toString(status)); - // There should be 34 files and masterProcedureWALs directory - assertEquals(35, fs.listStatus(oldLogDir).length); + // There should be 34 files and 1 masterProcedureWALs directory + assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); // 10 procedure WALs - assertEquals(10, fs.listStatus(oldProcedureWALDir).length); + assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // are scheduled for replication and masterProcedureWALs directory - TEST_UTIL.waitFor(1000, - (Waiter.Predicate) () -> 6 == fs.listStatus(oldLogDir).length); + TEST_UTIL.waitFor(1000, (Waiter.Predicate) () -> 6 == fs + .listStatus(OLD_WALS_DIR).length); // In masterProcedureWALs we end up with 5 newer Procedure WALs - TEST_UTIL.waitFor(1000, - (Waiter.Predicate) () -> 5 == fs.listStatus(oldProcedureWALDir).length); + TEST_UTIL.waitFor(1000, (Waiter.Predicate) () -> 5 == fs + .listStatus(OLD_PROCEDURE_WALS_DIR).length); - for (FileStatus file : fs.listStatus(oldLogDir)) { - LOG.debug("Kept log file in oldWALs: " + file.getPath().getName()); - } - for (FileStatus file : fs.listStatus(oldProcedureWALDir)) { - LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName()); + if (LOG.isDebugEnabled()) { + FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); + FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); + LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); + LOG.debug("Kept log file for masterProcedureWALs: {}", + Arrays.toString(statusProcedureWALs)); } } @@ -202,7 +231,7 @@ public class TestLogsCleaner { Configuration conf = TEST_UTIL.getConfiguration(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( + List dummyFiles = Arrays.asList( new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); @@ -235,18 +264,17 @@ public class TestLogsCleaner { /** * When zk is working both files should be returned - * @throws Exception + * @throws Exception from ZK watcher */ @Test(timeout=10000) public void testZooKeeperNormal() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( + List dummyFiles = Arrays.asList( new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - + ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); @@ -265,30 +293,18 @@ public class TestLogsCleaner { @Test public void testOnConfigurationChange() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, - LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); - conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, - LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); - conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); // Prepare environments Server server = new DummyServer(); - Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), - HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); - assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - cleaner.getCleanerThreadCheckIntervalMsec()); // Create dir and files for test - fs.delete(oldWALsDir, true); - fs.mkdirs(oldWALsDir); int numOfFiles = 10; - createFiles(fs, oldWALsDir, numOfFiles); - FileStatus[] status = fs.listStatus(oldWALsDir); + createFiles(fs, OLD_WALS_DIR, numOfFiles); + FileStatus[] status = fs.listStatus(OLD_WALS_DIR); assertEquals(numOfFiles, status.length); // Start cleaner chore Thread thread = new Thread(() -> cleaner.chore()); @@ -297,31 +313,24 @@ public class TestLogsCleaner { // change size of cleaners dynamically int sizeToChange = 4; long threadTimeoutToChange = 30 * 1000L; - long threadCheckIntervalToChange = 250L; conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); - conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - threadCheckIntervalToChange); cleaner.onConfigurationChange(conf); assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); - assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); // Stop chore thread.join(); - status = fs.listStatus(oldWALsDir); + status = fs.listStatus(OLD_WALS_DIR); assertEquals(0, status.length); } private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { - Random random = new Random(); for (int i = 0; i < numOfFiles; i++) { - int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M + // size of each file is 1M, 2M, or 3M + int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { - for (int m = 0; m < xMega; m++) { - byte[] M = new byte[1024 * 1024]; - random.nextBytes(M); - fsdos.write(M); - } + byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega)); + fsdos.write(M); } } }