HBASE-19503 Fix TestWALOpenAfterDNRollingStart for AsyncFSWAL

This commit is contained in:
zhangduo 2017-12-14 09:41:12 +08:00
parent 104afd74a6
commit ba5f9ac380
7 changed files with 222 additions and 179 deletions

View File

@ -380,7 +380,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public DatanodeInfo[] getPipeline() {
return locations;
State state = this.state;
return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
}
private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,

View File

@ -27,16 +27,16 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -122,14 +122,11 @@ public class LogRoller extends HasThread implements Closeable {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean neeRollAlready = entry.getValue();
if(wal instanceof FSHLog && !neeRollAlready) {
FSHLog hlog = (FSHLog)wal;
if ((now - hlog.getLastTimeCheckLowReplication())
> this.checkLowReplicationInterval) {
hlog.checkLogRoll();
}
boolean needRollAlready = entry.getValue();
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
}
} catch (Throwable e) {
LOG.warn("Failed checking low replication", e);

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE
import com.lmax.disruptor.RingBuffer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.management.MemoryType;
@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -67,13 +69,16 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
@ -222,6 +227,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
volatile W writer;
// Last time to check low replication on hlog's pipeline
private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
protected volatile boolean closed = false;
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
@ -230,7 +238,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* an IllegalArgumentException if used to compare paths from different wals.
*/
final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
private static final class WalProps {
@ -256,8 +264,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
* (contained in the log file name).
*/
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = new ConcurrentSkipListMap<>(
LOG_NAME_COMPARATOR);
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
/**
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
@ -311,8 +319,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// be stuck and make no progress if the buffer is filled with appends only and there is no
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return.
int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count",
1024 * 16);
int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
checkArgument(preallocatedEventCount >= 0,
"hbase.regionserver.wal.disruptor.event.count must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount);
@ -346,12 +354,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
// If prefix is null||empty then just name it wal
this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal"
: URLEncoder.encode(prefix, "UTF8");
this.walFilePrefix =
prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
// we only correctly differentiate suffices when numeric ones start with '.'
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
+ "' but instead was '" + suffix + "'");
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
"' but instead was '" + suffix + "'");
}
// Now that it exists, set the storage policy for the entire directory of wal files related to
// this FSHLog instance
@ -398,8 +406,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// (it costs a little x'ing bocks)
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
this.logrollsize = (long) (blocksize
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
this.logrollsize =
(long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
if (maxLogsDefined) {
@ -408,9 +416,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
+ StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
+ walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
this.slowSyncNs = TimeUnit.MILLISECONDS
.toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
@ -588,8 +596,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
regions = this.sequenceIdAccounting
.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
@ -599,8 +607,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + "; forcing flush of "
+ regions.length + " regions(s): " + sb.toString());
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());
}
return regions;
}
@ -688,8 +696,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else {
LOG.info("New WAL " + newPathString);
}
@ -755,8 +763,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
newPath = replaceWriter(oldPath, newPath, nextWriter);
tellListenersAboutPostLogRoll(oldPath, newPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Create new " + implClassName + " writer with pipeline: "
+ Arrays.toString(getPipeline()));
LOG.debug("Create new " + implClassName + " writer with pipeline: " +
Arrays.toString(getPipeline()));
}
// Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) {
@ -766,8 +774,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
// If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort.
throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " +
"for details.", exception);
throw new IOException(
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
} finally {
closeBarrier.endOp();
}
@ -843,8 +852,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
}
LOG.debug("Moved " + files.length + " WAL file(s) to " +
CommonFSUtils.getPath(this.walArchiveDir));
LOG.debug(
"Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
}
LOG.info("Closed WAL: " + toString());
}
@ -979,8 +988,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
@Override
public String toString() {
return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num "
+ filenum + ")";
return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
}
/**
@ -1023,8 +1031,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
protected abstract W createWriterInstance(Path path) throws IOException,
CommonFSUtils.StreamLacksCapabilityException;
protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
/**
* @return old wal file size
@ -1034,6 +1042,27 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doShutdown() throws IOException;
protected abstract boolean doCheckLogLowReplication();
public void checkLogLowReplication(long checkInterval) {
long now = EnvironmentEdgeManager.currentTime();
if (now - lastTimeCheckLowReplication < checkInterval) {
return;
}
// Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) {
return;
}
try {
lastTimeCheckLowReplication = now;
if (doCheckLogLowReplication()) {
requestLogRoll(true);
}
} finally {
rollWriterLock.unlock();
}
}
/**
* This method gets the pipeline for the current WAL.
*/
@ -1045,4 +1074,68 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
@VisibleForTesting
abstract int getLogReplication();
private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FSUtils.getWALFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
if (!fs.getFileStatus(p).isDirectory()) {
throw new IOException(p + " is not a directory");
}
final Path baseDir = FSUtils.getWALRootDir(conf);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: " +
"AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
}
/**
* Pass one or more log file names and it will either dump out a text version on
* <code>stdout</code> or split the specified log files.
*/
public static void main(String[] args) throws IOException {
if (args.length < 2) {
usage();
System.exit(-1);
}
// either dump using the WALPrettyPrinter or split, depending on args
if (args[0].compareTo("--dump") == 0) {
WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
LOG.fatal(
"\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
try {
Path logPath = new Path(args[i]);
FSUtils.setFsDefault(conf, logPath);
split(conf, logPath);
} catch (IOException t) {
t.printStackTrace(System.err);
System.exit(-1);
}
}
} else {
usage();
System.exit(-1);
}
}
}

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Nam
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -536,7 +535,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
// we will give up consuming so if there are some unsynced data we need to issue a sync.
if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() &&
syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
// no new data in the ringbuffer and we have at least one sync request
sync(writer);
}
@ -564,8 +563,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
@ -746,4 +745,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
int getLogReplication() {
return getPipeline().length;
}
@Override
protected boolean doCheckLogLowReplication() {
// not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
// typically there is no 'low replication' state, only a 'broken' state.
AsyncFSOutput output = this.fsOut;
return output != null && output.getPipeline().length == 0;
}
}

View File

@ -25,7 +25,6 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@ -42,25 +41,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -156,10 +148,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger();
// Last time to check low replication on hlog's pipeline
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
* using our logger instead of java native logger.
@ -627,14 +615,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
/**
* Schedule a log roll if needed.
*/
public void checkLogRoll() {
private void checkLogRoll() {
// Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) {
return;
}
boolean lowReplication;
try {
lowReplication = checkLowReplication();
lowReplication = doCheckLogLowReplication();
} finally {
rollWriterLock.unlock();
}
@ -646,9 +634,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
/**
* @return true if number of replicas for the WAL is lower than threshold
*/
private boolean checkLowReplication() {
protected boolean doCheckLogLowReplication() {
boolean logRollNeeded = false;
this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
// if the number of replicas in HDFS has fallen below the configured
// value, then roll logs.
try {
@ -767,24 +754,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER
+ Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FSUtils.getWALFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
if (!fs.getFileStatus(p).isDirectory()) {
throw new IOException(p + " is not a directory");
}
final Path baseDir = FSUtils.getWALRootDir(conf);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
/**
* This class is used coordinating two threads holding one thread at a 'safe point' while the
* orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL
@ -1118,52 +1087,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
private static void usage() {
System.err.println("Usage: FSHLog <ARGS>");
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: "
+ "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
}
/**
* Pass one or more log file names and it will either dump out a text version on
* <code>stdout</code> or split the specified log files.
*/
public static void main(String[] args) throws IOException {
if (args.length < 2) {
usage();
System.exit(-1);
}
// either dump using the WALPrettyPrinter or split, depending on args
if (args[0].compareTo("--dump") == 0) {
WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
LOG.fatal(
"\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
try {
Path logPath = new Path(args[i]);
FSUtils.setFsDefault(conf, logPath);
split(conf, logPath);
} catch (IOException t) {
t.printStackTrace(System.err);
System.exit(-1);
}
}
} else {
usage();
System.exit(-1);
}
}
/**
* This method gets the pipeline for the current WAL.
*/
@ -1176,12 +1099,4 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
return new DatanodeInfo[0];
}
/**
*
* @return last time on checking low replication
*/
public long getLastTimeCheckLowReplication() {
return this.lastTimeCheckLowReplication;
}
}

View File

@ -29,13 +29,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public abstract class ReaderBase implements AbstractFSWALProvider.Reader {

View File

@ -18,77 +18,109 @@
*/
package org.apache.hadoop.hbase.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@Category(MediumTests.class)
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestWALOpenAfterDNRollingStart {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static long DataNodeRestartInterval;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// Sleep time before restart next dn, we need to wait the current dn to finish start up
private static long DN_RESTART_INTERVAL = 15000;
// interval of checking low replication. The sleep time must smaller than
// DataNodeRestartInterval
// so a low replication case will be detected and the wal will be rolled
private static long CHECK_LOW_REPLICATION_INTERVAL = 10000;
@Parameter
public String walProvider;
@Parameters(name = "{index}: wal={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Sleep time before restart next dn, we need to wait the current dn to finish start up
DataNodeRestartInterval = 15000;
// interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval
// so a low replication case will be detected and the wal will be rolled
long checkLowReplicationInterval = 10000;
//don't let hdfs client to choose a new replica when dn down
TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable",
false);
// don't let hdfs client to choose a new replica when dn down
TEST_UTIL.getConfiguration()
.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", false);
TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
checkLowReplicationInterval);
CHECK_LOW_REPLICATION_INTERVAL);
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.startMiniZKCluster();
}
@Before
public void setUp() throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().set("hbase.wal.provider", walProvider);
TEST_UTIL.startMiniHBaseCluster(1, 1);
}
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniHBaseCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* see HBASE-18132
* This is a test case of failing open a wal(for replication for example) after all datanode
* restarted (rolling upgrade, for example).
* Before this patch, low replication detection is only used when syncing wal.
* But if the wal haven't had any entry whiten, it will never know all the replica of the wal
* is broken(because of dn restarting). And this wal can never be open
* see HBASE-18132 This is a test case of failing open a wal(for replication for example) after
* all datanode restarted (rolling upgrade, for example). Before this patch, low replication
* detection is only used when syncing wal. But if the wal haven't had any entry whiten, it will
* never know all the replica of the wal is broken(because of dn restarting). And this wal can
* never be open
* @throws Exception
*/
@Test(timeout = 300000)
@Test
public void test() throws Exception {
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
FSHLog hlog = (FSHLog)server.getWAL(null);
Path currentFile = hlog.getCurrentFileName();
//restart every dn to simulate a dn rolling upgrade
for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) {
//This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from
//the dn list and then add to the tail of this list, we need to always restart the first one
//to simulate rolling upgrade of every dn.
AbstractFSWAL<?> wal = (AbstractFSWAL<?>) server.getWAL(null);
Path currentFile = wal.getCurrentFileName();
// restart every dn to simulate a dn rolling upgrade
for (int i = 0, n = TEST_UTIL.getDFSCluster().getDataNodes().size(); i < n; i++) {
// This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from
// the dn list and then add to the tail of this list, we need to always restart the first one
// to simulate rolling upgrade of every dn.
TEST_UTIL.getDFSCluster().restartDataNode(0);
//sleep enough time so log roller can detect the pipeline break and roll log
Thread.sleep(DataNodeRestartInterval);
// sleep enough time so log roller can detect the pipeline break and roll log
Thread.sleep(DN_RESTART_INTERVAL);
}
if(!server.getFileSystem().exists(currentFile)) {
if (!server.getFileSystem().exists(currentFile)) {
Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration());
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
currentFile = new Path(oldLogDir, currentFile.getName());
}
//if the log is not rolled, then we can never open this wal forever.
WAL.Reader reader = WALFactory
.createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration());
// if the log is not rolled, then we can never open this wal forever.
try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
TEST_UTIL.getConfiguration())) {
reader.next();
}
}
}