From 211f231b4aef477a35a4d8e6d47be07fc2e65c98 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 14 Dec 2017 09:41:12 +0800 Subject: [PATCH] HBASE-19503 Fix TestWALOpenAfterDNRollingStart for AsyncFSWAL --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 3 +- .../hadoop/hbase/regionserver/LogRoller.java | 17 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 155 ++++++++++++++---- .../hbase/regionserver/wal/AsyncFSWAL.java | 15 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 91 +--------- .../hbase/regionserver/wal/ReaderBase.java | 4 +- .../wal/TestWALOpenAfterDNRollingStart.java | 116 ++++++++----- 7 files changed, 222 insertions(+), 179 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 91086d77196..b874aa73cd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -380,7 +380,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { @Override public DatanodeInfo[] getPipeline() { - return locations; + State state = this.state; + return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0]; } private void flushBuffer(CompletableFuture future, ByteBuf dataBuf, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 809a457313d..451b8869850 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -27,16 +27,16 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -122,14 +122,11 @@ public class LogRoller extends HasThread implements Closeable { try { for (Entry entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean neeRollAlready = entry.getValue(); - if(wal instanceof FSHLog && !neeRollAlready) { - FSHLog hlog = (FSHLog)wal; - if ((now - hlog.getLastTimeCheckLowReplication()) - > this.checkLowReplicationInterval) { - hlog.checkLogRoll(); - } + boolean needRollAlready = entry.getValue(); + if (needRollAlready || !(wal instanceof AbstractFSWAL)) { + continue; } + ((AbstractFSWAL) wal).checkLogLowReplication(checkLowReplicationInterval); } } catch (Throwable e) { LOG.warn("Failed checking low replication", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 246221b8c45..992903656be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import com.lmax.disruptor.RingBuffer; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.MemoryType; @@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; @@ -67,13 +69,16 @@ import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.core.TraceScope; @@ -222,6 +227,9 @@ public abstract class AbstractFSWAL implements WAL { */ volatile W writer; + // Last time to check low replication on hlog's pipeline + private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); + protected volatile boolean closed = false; protected final AtomicBoolean shutdown = new AtomicBoolean(false); @@ -230,7 +238,7 @@ public abstract class AbstractFSWAL implements WAL { * an IllegalArgumentException if used to compare paths from different wals. */ final Comparator LOG_NAME_COMPARATOR = - (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); + (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); private static final class WalProps { @@ -256,8 +264,8 @@ public abstract class AbstractFSWAL implements WAL { * Map of WAL log file to properties. The map is sorted by the log file creation timestamp * (contained in the log file name). */ - protected ConcurrentNavigableMap walFile2Props = new ConcurrentSkipListMap<>( - LOG_NAME_COMPARATOR); + protected ConcurrentNavigableMap walFile2Props = + new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. @@ -311,8 +319,8 @@ public abstract class AbstractFSWAL implements WAL { // be stuck and make no progress if the buffer is filled with appends only and there is no // sync. If no sync, then the handlers will be outstanding just waiting on sync completion // before they return. - int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", - 1024 * 16); + int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); checkArgument(preallocatedEventCount >= 0, "hbase.regionserver.wal.disruptor.event.count must > 0"); int floor = Integer.highestOneBit(preallocatedEventCount); @@ -346,12 +354,12 @@ public abstract class AbstractFSWAL implements WAL { } // If prefix is null||empty then just name it wal - this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal" - : URLEncoder.encode(prefix, "UTF8"); + this.walFilePrefix = + prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); // we only correctly differentiate suffices when numeric ones start with '.' if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { - throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER - + "' but instead was '" + suffix + "'"); + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + + "' but instead was '" + suffix + "'"); } // Now that it exists, set the storage policy for the entire directory of wal files related to // this FSHLog instance @@ -398,8 +406,8 @@ public abstract class AbstractFSWAL implements WAL { // (it costs a little x'ing bocks) final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir)); - this.logrollsize = (long) (blocksize - * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); + this.logrollsize = + (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if (maxLogsDefined) { @@ -408,9 +416,9 @@ public abstract class AbstractFSWAL implements WAL { this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", Math.max(32, calculateMaxLogFiles(conf, logrollsize))); - LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" - + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" - + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); + LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); this.slowSyncNs = TimeUnit.MILLISECONDS .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); this.walSyncTimeoutNs = TimeUnit.MILLISECONDS @@ -588,8 +596,8 @@ public abstract class AbstractFSWAL implements WAL { int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { Map.Entry firstWALEntry = this.walFile2Props.firstEntry(); - regions = this.sequenceIdAccounting - .findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); + regions = + this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); } if (regions != null) { StringBuilder sb = new StringBuilder(); @@ -599,8 +607,8 @@ public abstract class AbstractFSWAL implements WAL { } sb.append(Bytes.toStringBinary(regions[i])); } - LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + "; forcing flush of " - + regions.length + " regions(s): " + sb.toString()); + LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + + "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); } return regions; } @@ -688,8 +696,8 @@ public abstract class AbstractFSWAL implements WAL { this.walFile2Props.put(oldPath, new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries - + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); + LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); } else { LOG.info("New WAL " + newPathString); } @@ -755,8 +763,8 @@ public abstract class AbstractFSWAL implements WAL { newPath = replaceWriter(oldPath, newPath, nextWriter); tellListenersAboutPostLogRoll(oldPath, newPath); if (LOG.isDebugEnabled()) { - LOG.debug("Create new " + implClassName + " writer with pipeline: " - + Arrays.toString(getPipeline())); + LOG.debug("Create new " + implClassName + " writer with pipeline: " + + Arrays.toString(getPipeline())); } // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { @@ -766,8 +774,9 @@ public abstract class AbstractFSWAL implements WAL { } catch (CommonFSUtils.StreamLacksCapabilityException exception) { // If the underlying FileSystem can't do what we ask, treat as IO failure so // we'll abort. - throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " + - "for details.", exception); + throw new IOException( + "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", + exception); } finally { closeBarrier.endOp(); } @@ -843,8 +852,8 @@ public abstract class AbstractFSWAL implements WAL { } } } - LOG.debug("Moved " + files.length + " WAL file(s) to " + - CommonFSUtils.getPath(this.walArchiveDir)); + LOG.debug( + "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); } LOG.info("Closed WAL: " + toString()); } @@ -979,8 +988,7 @@ public abstract class AbstractFSWAL implements WAL { @Override public String toString() { - return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " - + filenum + ")"; + return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; } /** @@ -1023,8 +1031,8 @@ public abstract class AbstractFSWAL implements WAL { protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) throws IOException, - CommonFSUtils.StreamLacksCapabilityException; + protected abstract W createWriterInstance(Path path) + throws IOException, CommonFSUtils.StreamLacksCapabilityException; /** * @return old wal file size @@ -1034,6 +1042,27 @@ public abstract class AbstractFSWAL implements WAL { protected abstract void doShutdown() throws IOException; + protected abstract boolean doCheckLogLowReplication(); + + public void checkLogLowReplication(long checkInterval) { + long now = EnvironmentEdgeManager.currentTime(); + if (now - lastTimeCheckLowReplication < checkInterval) { + return; + } + // Will return immediately if we are in the middle of a WAL log roll currently. + if (!rollWriterLock.tryLock()) { + return; + } + try { + lastTimeCheckLowReplication = now; + if (doCheckLogLowReplication()) { + requestLogRoll(true); + } + } finally { + rollWriterLock.unlock(); + } + } + /** * This method gets the pipeline for the current WAL. */ @@ -1045,4 +1074,68 @@ public abstract class AbstractFSWAL implements WAL { */ @VisibleForTesting abstract int getLogReplication(); + + private static void split(final Configuration conf, final Path p) throws IOException { + FileSystem fs = FSUtils.getWALFileSystem(conf); + if (!fs.exists(p)) { + throw new FileNotFoundException(p.toString()); + } + if (!fs.getFileStatus(p).isDirectory()) { + throw new IOException(p + " is not a directory"); + } + + final Path baseDir = FSUtils.getWALRootDir(conf); + Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); + if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, + AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { + archiveDir = new Path(archiveDir, p.getName()); + } + WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); + } + + private static void usage() { + System.err.println("Usage: AbstractFSWAL "); + System.err.println("Arguments:"); + System.err.println(" --dump Dump textual representation of passed one or more files"); + System.err.println(" For example: " + + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); + System.err.println(" --split Split the passed directory of WAL logs"); + System.err.println( + " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); + } + + /** + * Pass one or more log file names and it will either dump out a text version on + * stdout or split the specified log files. + */ + public static void main(String[] args) throws IOException { + if (args.length < 2) { + usage(); + System.exit(-1); + } + // either dump using the WALPrettyPrinter or split, depending on args + if (args[0].compareTo("--dump") == 0) { + WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); + } else if (args[0].compareTo("--perf") == 0) { + LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:"); + LOG.fatal( + "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); + System.exit(-1); + } else if (args[0].compareTo("--split") == 0) { + Configuration conf = HBaseConfiguration.create(); + for (int i = 1; i < args.length; i++) { + try { + Path logPath = new Path(args[i]); + FSUtils.setFsDefault(conf, logPath); + split(conf, logPath); + } catch (IOException t) { + t.printStackTrace(System.err); + System.exit(-1); + } + } + } else { + usage(); + System.exit(-1); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 832eefd559e..4ccfdf34a92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Nam import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -536,7 +535,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { // we will give up consuming so if there are some unsynced data we need to issue a sync. if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && - syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { + syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { // no new data in the ringbuffer and we have at least one sync request sync(writer); } @@ -564,8 +563,8 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { - long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, - waitingConsumePayloads); + long txid = + stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { consumeExecutor.execute(consumer); } @@ -746,4 +745,12 @@ public class AsyncFSWAL extends AbstractFSWAL { int getLogReplication() { return getPipeline().length; } + + @Override + protected boolean doCheckLogLowReplication() { + // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so + // typically there is no 'low replication' state, only a 'broken' state. + AsyncFSOutput output = this.fsOut; + return output != null && output.getPipeline().length == 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 3da37d3031e..fd9d6c122ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -25,7 +25,6 @@ import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -42,25 +41,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -156,10 +148,6 @@ public class FSHLog extends AbstractFSWAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); - // Last time to check low replication on hlog's pipeline - private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); - - /** * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs * using our logger instead of java native logger. @@ -627,14 +615,14 @@ public class FSHLog extends AbstractFSWAL { /** * Schedule a log roll if needed. */ - public void checkLogRoll() { + private void checkLogRoll() { // Will return immediately if we are in the middle of a WAL log roll currently. if (!rollWriterLock.tryLock()) { return; } boolean lowReplication; try { - lowReplication = checkLowReplication(); + lowReplication = doCheckLogLowReplication(); } finally { rollWriterLock.unlock(); } @@ -646,9 +634,8 @@ public class FSHLog extends AbstractFSWAL { /** * @return true if number of replicas for the WAL is lower than threshold */ - private boolean checkLowReplication() { + protected boolean doCheckLogLowReplication() { boolean logRollNeeded = false; - this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. try { @@ -767,24 +754,6 @@ public class FSHLog extends AbstractFSWAL { .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); - private static void split(final Configuration conf, final Path p) throws IOException { - FileSystem fs = FSUtils.getWALFileSystem(conf); - if (!fs.exists(p)) { - throw new FileNotFoundException(p.toString()); - } - if (!fs.getFileStatus(p).isDirectory()) { - throw new IOException(p + " is not a directory"); - } - - final Path baseDir = FSUtils.getWALRootDir(conf); - Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); - if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, - AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { - archiveDir = new Path(archiveDir, p.getName()); - } - WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); - } - /** * This class is used coordinating two threads holding one thread at a 'safe point' while the * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL @@ -1118,52 +1087,6 @@ public class FSHLog extends AbstractFSWAL { } } - private static void usage() { - System.err.println("Usage: FSHLog "); - System.err.println("Arguments:"); - System.err.println(" --dump Dump textual representation of passed one or more files"); - System.err.println(" For example: " - + "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); - System.err.println(" --split Split the passed directory of WAL logs"); - System.err.println( - " For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR"); - } - - /** - * Pass one or more log file names and it will either dump out a text version on - * stdout or split the specified log files. - */ - public static void main(String[] args) throws IOException { - if (args.length < 2) { - usage(); - System.exit(-1); - } - // either dump using the WALPrettyPrinter or split, depending on args - if (args[0].compareTo("--dump") == 0) { - WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); - } else if (args[0].compareTo("--perf") == 0) { - LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:"); - LOG.fatal( - "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); - System.exit(-1); - } else if (args[0].compareTo("--split") == 0) { - Configuration conf = HBaseConfiguration.create(); - for (int i = 1; i < args.length; i++) { - try { - Path logPath = new Path(args[i]); - FSUtils.setFsDefault(conf, logPath); - split(conf, logPath); - } catch (IOException t) { - t.printStackTrace(System.err); - System.exit(-1); - } - } - } else { - usage(); - System.exit(-1); - } - } - /** * This method gets the pipeline for the current WAL. */ @@ -1176,12 +1099,4 @@ public class FSHLog extends AbstractFSWAL { } return new DatanodeInfo[0]; } - - /** - * - * @return last time on checking low replication - */ - public long getLastTimeCheckLowReplication() { - return this.lastTimeCheckLowReplication; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 2a01b14c981..9a6bfd39583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -29,13 +29,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) public abstract class ReaderBase implements AbstractFSWALProvider.Reader { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java index cf2c5d7e326..a612aff0350 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java @@ -18,77 +18,109 @@ */ package org.apache.hadoop.hbase.wal; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; - -@Category(MediumTests.class) +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestWALOpenAfterDNRollingStart { - final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static long DataNodeRestartInterval; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + // Sleep time before restart next dn, we need to wait the current dn to finish start up + private static long DN_RESTART_INTERVAL = 15000; + + // interval of checking low replication. The sleep time must smaller than + // DataNodeRestartInterval + // so a low replication case will be detected and the wal will be rolled + private static long CHECK_LOW_REPLICATION_INTERVAL = 10000; + + @Parameter + public String walProvider; + + @Parameters(name = "{index}: wal={0}") + public static List data() { + return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" }); + } @BeforeClass public static void setUpBeforeClass() throws Exception { - // Sleep time before restart next dn, we need to wait the current dn to finish start up - DataNodeRestartInterval = 15000; - // interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval - // so a low replication case will be detected and the wal will be rolled - long checkLowReplicationInterval = 10000; - //don't let hdfs client to choose a new replica when dn down - TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", - false); + // don't let hdfs client to choose a new replica when dn down + TEST_UTIL.getConfiguration() + .setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", false); TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", - checkLowReplicationInterval); + CHECK_LOW_REPLICATION_INTERVAL); TEST_UTIL.startMiniDFSCluster(3); - TEST_UTIL.startMiniCluster(1); + TEST_UTIL.startMiniZKCluster(); + } + @Before + public void setUp() throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().set("hbase.wal.provider", walProvider); + TEST_UTIL.startMiniHBaseCluster(1, 1); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); } /** - * see HBASE-18132 - * This is a test case of failing open a wal(for replication for example) after all datanode - * restarted (rolling upgrade, for example). - * Before this patch, low replication detection is only used when syncing wal. - * But if the wal haven't had any entry whiten, it will never know all the replica of the wal - * is broken(because of dn restarting). And this wal can never be open + * see HBASE-18132 This is a test case of failing open a wal(for replication for example) after + * all datanode restarted (rolling upgrade, for example). Before this patch, low replication + * detection is only used when syncing wal. But if the wal haven't had any entry whiten, it will + * never know all the replica of the wal is broken(because of dn restarting). And this wal can + * never be open * @throws Exception */ - @Test(timeout = 300000) + @Test public void test() throws Exception { HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); - FSHLog hlog = (FSHLog)server.getWAL(null); - Path currentFile = hlog.getCurrentFileName(); - //restart every dn to simulate a dn rolling upgrade - for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) { - //This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from - //the dn list and then add to the tail of this list, we need to always restart the first one - //to simulate rolling upgrade of every dn. + AbstractFSWAL wal = (AbstractFSWAL) server.getWAL(null); + Path currentFile = wal.getCurrentFileName(); + // restart every dn to simulate a dn rolling upgrade + for (int i = 0, n = TEST_UTIL.getDFSCluster().getDataNodes().size(); i < n; i++) { + // This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from + // the dn list and then add to the tail of this list, we need to always restart the first one + // to simulate rolling upgrade of every dn. TEST_UTIL.getDFSCluster().restartDataNode(0); - //sleep enough time so log roller can detect the pipeline break and roll log - Thread.sleep(DataNodeRestartInterval); + // sleep enough time so log roller can detect the pipeline break and roll log + Thread.sleep(DN_RESTART_INTERVAL); } - if(!server.getFileSystem().exists(currentFile)) { + if (!server.getFileSystem().exists(currentFile)) { Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration()); final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); currentFile = new Path(oldLogDir, currentFile.getName()); } - //if the log is not rolled, then we can never open this wal forever. - WAL.Reader reader = WALFactory - .createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration()); - + // if the log is not rolled, then we can never open this wal forever. + try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile, + TEST_UTIL.getConfiguration())) { + reader.next(); + } } - - }