diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 4d85b52ab90..8ebfd1acf55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -81,13 +82,16 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { private volatile boolean exitWorker; private final Object grabTaskLock = new Object(); private boolean workerInGrabTask = false; - + private final int report_period; public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName, TaskExecutor splitTaskExecutor) { super(watcher); this.serverName = serverName; this.splitTaskExecutor = splitTaskExecutor; + report_period = conf.getInt("hbase.splitlog.report.period", + conf.getInt("hbase.splitlog.manager.timeout", + SplitLogManager.DEFAULT_TIMEOUT) / 2); } public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, @@ -274,15 +278,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask), new CancelableProgressable() { + private long last_report_at = 0; + @Override public boolean progress() { - if (!attemptToOwnTask(false)) { - LOG.warn("Failed to heartbeat the task" + currentTask); - return false; + long t = EnvironmentEdgeManager.currentTimeMillis(); + if ((t - last_report_at) > report_period) { + last_report_at = t; + if (!attemptToOwnTask(false)) { + LOG.warn("Failed to heartbeat the task" + currentTask); + return false; + } } return true; } }); + switch (status) { case DONE: endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index f96f7e0532a..a91284ab8d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; public class HLogFactory { @@ -70,6 +71,11 @@ public class HLogFactory { logReaderClass = null; } + public static HLog.Reader createReader(final FileSystem fs, + final Path path, Configuration conf) throws IOException { + return createReader(fs, path, conf, null); + } + /** * Create a reader for the WAL. If you are reading from a file that's being written to * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method @@ -77,8 +83,8 @@ public class HLogFactory { * @return A WAL reader. Close when done with it. * @throws IOException */ - public static HLog.Reader createReader(final FileSystem fs, - final Path path, Configuration conf) throws IOException { + public static HLog.Reader createReader(final FileSystem fs, final Path path, + Configuration conf, CancelableProgressable reporter) throws IOException { if (logReaderClass == null) { logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", SequenceFileLogReader.class, Reader.class); @@ -102,6 +108,9 @@ public class HLogFactory { if (++nbAttempt == 1) { LOG.warn("Lease should have recovered. This is not expected. Will retry", e); } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) { LOG.error("Can't open after " + nbAttempt + " attempts and " + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index f0ba96ed3e1..032e2cf8f8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -113,9 +112,6 @@ public class HLogSplitter { private MonitoredTask status; - // Used in distributed log splitting - private DistributedLogSplittingHelper distributedLogSplittingHelper = null; - // For checking the latest flushed sequence id protected final LastSequenceId sequenceIdChecker; @@ -263,10 +259,6 @@ public class HLogSplitter { return outputSink.getOutputCounts(); } - void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) { - this.distributedLogSplittingHelper = helper; - } - /** * Splits the HLog edits in the given list of logfiles (that are a mix of edits * on multiple regions) by region and then splits them per region directories, @@ -317,7 +309,7 @@ public class HLogSplitter { //meta only. However, there is a sequence number that can be obtained //only by parsing.. so we parse for all files currently //TODO: optimize this part somehow - in = getReader(fs, log, conf, skipErrors); + in = getReader(fs, log, conf, skipErrors, null); if (in != null) { parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors); } @@ -420,53 +412,58 @@ public class HLogSplitter { CancelableProgressable reporter) throws IOException { boolean isCorrupted = false; Preconditions.checkState(status == null); - status = TaskMonitor.get().createStatus( - "Splitting log file " + logfile.getPath() + - "into a temporary staging area."); boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", - HLog.SPLIT_SKIP_ERRORS_DEFAULT); + HLog.SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); Path logPath = logfile.getPath(); - long logLength = logfile.getLen(); - LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); - status.setStatus("Opening log file"); - Reader in = null; - try { - in = getReader(fs, logfile, conf, skipErrors); - } catch (CorruptedLogFileException e) { - LOG.warn("Could not get reader, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); - isCorrupted = true; - } - if (in == null) { - status.markComplete("Was nothing to split in log file"); - LOG.warn("Nothing to split in log file " + logPath); - return true; - } - this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter)); - if (!reportProgressIfIsDistributedLogSplitting()) { - return false; - } + boolean outputSinkStarted = false; boolean progress_failed = false; - int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); - int numOpenedFilesLastCheck = 0; - outputSink.startWriterThreads(); - // Report progress every so many edits and/or files opened (opening a file - // takes a bit of time). - Map lastFlushedSequenceIds = - new TreeMap(Bytes.BYTES_COMPARATOR); - Entry entry; int editsCount = 0; int editsSkipped = 0; + try { + status = TaskMonitor.get().createStatus( + "Splitting log file " + logfile.getPath() + + "into a temporary staging area."); + long logLength = logfile.getLen(); + LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); + status.setStatus("Opening log file"); + if (reporter != null && !reporter.progress()) { + progress_failed = true; + return false; + } + Reader in = null; + try { + in = getReader(fs, logfile, conf, skipErrors, reporter); + } catch (CorruptedLogFileException e) { + LOG.warn("Could not get reader, corrupted log file " + logPath, e); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + isCorrupted = true; + } + if (in == null) { + status.markComplete("Was nothing to split in log file"); + LOG.warn("Nothing to split in log file " + logPath); + return true; + } + int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); + int numOpenedFilesLastCheck = 0; + outputSink.setReporter(reporter); + outputSink.startWriterThreads(); + outputSinkStarted = true; + // Report progress every so many edits and/or files opened (opening a file + // takes a bit of time). + Map lastFlushedSequenceIds = + new TreeMap(Bytes.BYTES_COMPARATOR); + Entry entry; + while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); Long lastFlushedSequenceId = -1l; if (sequenceIdChecker != null) { lastFlushedSequenceId = lastFlushedSequenceIds.get(region); if (lastFlushedSequenceId == null) { - lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); - lastFlushedSequenceIds.put(region, lastFlushedSequenceId); + lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); + lastFlushedSequenceIds.put(region, lastFlushedSequenceId); } } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { @@ -482,7 +479,8 @@ public class HLogSplitter { String countsStr = (editsCount - editsSkipped) + " edits, skipped " + editsSkipped + " edits."; status.setStatus("Split " + countsStr); - if (!reportProgressIfIsDistributedLogSplitting()) { + if (reporter != null && !reporter.progress()) { + progress_failed = true; return false; } } @@ -500,12 +498,13 @@ public class HLogSplitter { throw e; } finally { LOG.info("Finishing writing output logs and closing down."); - progress_failed = outputSink.finishWritingAndClose() == null; + if (outputSinkStarted) { + progress_failed = outputSink.finishWritingAndClose() == null; + } String msg = "Processed " + editsCount + " edits across " - + outputSink.getOutputCounts().size() + " regions; log file=" - + logPath + " is corrupted = " + isCorrupted + " progress failed = " - + progress_failed; - ; + + outputSink.getOutputCounts().size() + " regions; log file=" + + logPath + " is corrupted = " + isCorrupted + " progress failed = " + + progress_failed; LOG.info(msg); status.markComplete(msg); } @@ -620,6 +619,7 @@ public class HLogSplitter { * @return Path to file into which to dump split log edits. * @throws IOException */ + @SuppressWarnings("deprecation") static Path getRegionSplitEditsPath(final FileSystem fs, final Entry logEntry, final Path rootDir, boolean isCreate) throws IOException { @@ -724,7 +724,7 @@ public class HLogSplitter { * @throws CorruptedLogFileException */ protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf, - boolean skipErrors) + boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException { Path path = file.getPath(); long length = file.getLen(); @@ -739,9 +739,9 @@ public class HLogSplitter { } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); + FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); try { - in = getReader(fs, path, conf); + in = getReader(fs, path, conf, reporter); } catch (EOFException e) { if (length <= 0) { // TODO should we ignore an empty, not-last log file if skip.errors @@ -757,8 +757,8 @@ public class HLogSplitter { } } } catch (IOException e) { - if (!skipErrors) { - throw e; + if (!skipErrors || e instanceof InterruptedIOException) { + throw e; // Don't mark the file corrupted if interrupted, or not skipErrors } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Could not open hlog " + @@ -826,9 +826,9 @@ public class HLogSplitter { /** * Create a new {@link Reader} for reading logs to split. */ - protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) - throws IOException { - return HLogFactory.createReader(fs, curLogFile, conf); + protected Reader getReader(FileSystem fs, Path curLogFile, + Configuration conf, CancelableProgressable reporter) throws IOException { + return HLogFactory.createReader(fs, curLogFile, conf, reporter); } /** @@ -1108,56 +1108,6 @@ public class HLogSplitter { return ret; } - /*** - * @return false if it is a distributed log splitting and it failed to report - * progress - */ - private boolean reportProgressIfIsDistributedLogSplitting() { - if (this.distributedLogSplittingHelper != null) { - return distributedLogSplittingHelper.reportProgress(); - } else { - return true; - } - } - - /** - * A class used in distributed log splitting - * - */ - class DistributedLogSplittingHelper { - // Report progress, only used in distributed log splitting - private final CancelableProgressable splitReporter; - // How often to send a progress report (default 1/2 master timeout) - private final int report_period; - private long last_report_at = 0; - - public DistributedLogSplittingHelper(CancelableProgressable reporter) { - this.splitReporter = reporter; - report_period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", - SplitLogManager.DEFAULT_TIMEOUT) / 2); - } - - /*** - * @return false if reporter failed progressing - */ - private boolean reportProgress() { - if (splitReporter == null) { - return true; - } else { - long t = EnvironmentEdgeManager.currentTimeMillis(); - if ((t - last_report_at) > report_period) { - last_report_at = t; - if (this.splitReporter.progress() == false) { - LOG.warn("Failed: reporter.progress asked us to terminate"); - return false; - } - } - return true; - } - } - } - /** * Class that manages the output streams from the log splitting process. */ @@ -1178,6 +1128,8 @@ public class HLogSplitter { private final int numThreads; + private CancelableProgressable reporter = null; + public OutputSink() { // More threads could potentially write faster at the expense // of causing more disk seeks as the logs are split. @@ -1188,6 +1140,10 @@ public class HLogSplitter { "hbase.regionserver.hlog.splitlog.writer.threads", 3); } + void setReporter(CancelableProgressable reporter) { + this.reporter = reporter; + } + /** * Start the threads that will pump data from the entryBuffers * to the output files. @@ -1213,7 +1169,7 @@ public class HLogSplitter { t.finish(); } for (WriterThread t : writerThreads) { - if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { + if (!progress_failed && reporter != null && !reporter.progress()) { progress_failed = true; } try { @@ -1309,7 +1265,7 @@ public class HLogSplitter { for (int i = 0, n = logWriters.size(); i < n; i++) { Future future = completionService.take(); future.get(); - if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { + if (!progress_failed && reporter != null && !reporter.progress()) { progress_failed = true; } } @@ -1437,8 +1393,6 @@ public class HLogSplitter { } } - - /** * Private data structure that wraps a Writer and its Path, * also collecting statistics about the data written to this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java index 214df66ade5..2529905dcdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java @@ -38,15 +38,15 @@ import java.io.InterruptedIOException; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class FSHDFSUtils extends FSUtils{ +public class FSHDFSUtils extends FSUtils { private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class); /** * Recover the lease from HDFS, retrying multiple times. */ @Override - public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) - throws IOException { + public void recoverFileLease(final FileSystem fs, final Path p, + Configuration conf, CancelableProgressable reporter) throws IOException { // lease recovery not needed for local file system case. if (!(fs instanceof DistributedFileSystem)) { return; @@ -81,6 +81,9 @@ public class FSHDFSUtils extends FSUtils{ ", retrying.", e); } if (!recovered) { + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } // try at least twice. if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) { LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java index f6db7be7fff..69f42b2dd94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java @@ -35,9 +35,9 @@ import org.apache.commons.logging.LogFactory; public class FSMapRUtils extends FSUtils { private static final Log LOG = LogFactory.getLog(FSMapRUtils.class); - public void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf) throws IOException { - LOG.info("Recovering file " + p.toString() + + public void recoverFileLease(final FileSystem fs, final Path p, + Configuration conf, CancelableProgressable reporter) throws IOException { + LOG.info("Recovering file " + p.toString() + " by changing permission to readonly"); FsPermission roPerm = new FsPermission((short) 0444); fs.setPermission(p, roPerm); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 370203e6f89..fdaf9527e05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -206,6 +206,7 @@ public abstract class FSUtils { * @return output stream to the created file * @throws IOException if the file cannot be created */ + @SuppressWarnings("deprecation") public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, boolean overwrite) throws IOException { LOG.debug("Creating file=" + path + " with permission=" + perm); @@ -763,6 +764,7 @@ public abstract class FSUtils { * @return true if exists * @throws IOException e */ + @SuppressWarnings("deprecation") public static boolean metaRegionExists(FileSystem fs, Path rootdir) throws IOException { Path rootRegionDir = @@ -1147,7 +1149,7 @@ public abstract class FSUtils { * @throws IOException */ public abstract void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf) throws IOException; + Configuration conf, CancelableProgressable reporter) throws IOException; /** * @param fs diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index ab3f407036f..84b5f6b157b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -753,7 +753,7 @@ public class RegionSplitter { } else { LOG.debug("_balancedSplit file found. Replay log to restore state..."); FSUtils.getInstance(fs, table.getConfiguration()) - .recoverFileLease(fs, splitFile, table.getConfiguration()); + .recoverFileLease(fs, splitFile, table.getConfiguration(), null); // parse split file and process remaining splits FSDataInputStream tmpIn = fs.open(splitFile); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 4f35286c08e..d1e8832924b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -159,6 +159,7 @@ public class TestDistributedLogSplitting { for (HRegionInfo hri : regions) { Path tdir = HTableDescriptor.getTableDir(rootdir, table); + @SuppressWarnings("deprecation") Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 16378420082..3c24a22aad2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -459,7 +459,7 @@ public class TestHLog { public void run() { try { FSUtils.getInstance(fs, rlConf) - .recoverFileLease(recoveredFs, walPath, rlConf); + .recoverFileLease(recoveredFs, walPath, rlConf, null); } catch (IOException e) { exception = e; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 21b3023e254..28586791e5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -918,6 +920,45 @@ public class TestHLogSplit { } } + @Test + public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(HLOGDIR)[0]; + fs.initialize(fs.getUri(), conf); + + final AtomicInteger count = new AtomicInteger(); + + CancelableProgressable localReporter + = new CancelableProgressable() { + @Override + public boolean progress() { + count.getAndIncrement(); + return false; + } + }; + + FileSystem spiedFs = Mockito.spy(fs); + Mockito.doAnswer(new Answer() { + public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(1500); // Sleep a while and wait report status invoked + return (FSDataInputStream)invocation.callRealMethod(); + } + }).when(spiedFs).open(Mockito.any(), Mockito.anyInt()); + + try { + conf.setInt("hbase.splitlog.report.period", 1000); + HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null); + boolean ret = s.splitLogFile(logfile, localReporter); + assertFalse("Log splitting should failed", ret); + assertTrue(count.get() > 0); + } catch (IOException e) { + fail("There shouldn't be any exception but: " + e.toString()); + } finally { + // reset it back to its default value + conf.setInt("hbase.splitlog.report.period", 59000); + } + } + /** * Test log split process with fake data and lots of edits to trigger threading * issues. @@ -1000,8 +1041,8 @@ public class TestHLogSplit { /* Produce a mock reader that generates fake entries */ - protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) - throws IOException { + protected Reader getReader(FileSystem fs, Path curLogFile, + Configuration conf, CancelableProgressable reporter) throws IOException { Reader mockReader = Mockito.mock(Reader.class); Mockito.doAnswer(new Answer() { int index = 0;