HBASE-19503 Fix TestWALOpenAfterDNRollingStart for AsyncFSWAL

This commit is contained in:
zhangduo 2017-12-14 09:41:12 +08:00
parent b682ea7c8a
commit 211f231b4a
7 changed files with 222 additions and 179 deletions

View File

@ -380,7 +380,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override @Override
public DatanodeInfo[] getPipeline() { public DatanodeInfo[] getPipeline() {
return locations; State state = this.state;
return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
} }
private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,

View File

@ -27,16 +27,16 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -122,14 +122,11 @@ public class LogRoller extends HasThread implements Closeable {
try { try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) { for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey(); WAL wal = entry.getKey();
boolean neeRollAlready = entry.getValue(); boolean needRollAlready = entry.getValue();
if(wal instanceof FSHLog && !neeRollAlready) { if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
FSHLog hlog = (FSHLog)wal; continue;
if ((now - hlog.getLastTimeCheckLowReplication())
> this.checkLowReplicationInterval) {
hlog.checkLogRoll();
}
} }
((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
} }
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed checking low replication", e); LOG.warn("Failed checking low replication", e);

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.management.MemoryType; import java.lang.management.MemoryType;
@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
@ -67,13 +69,16 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
@ -222,6 +227,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/ */
volatile W writer; volatile W writer;
// Last time to check low replication on hlog's pipeline
private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
protected volatile boolean closed = false; protected volatile boolean closed = false;
protected final AtomicBoolean shutdown = new AtomicBoolean(false); protected final AtomicBoolean shutdown = new AtomicBoolean(false);
@ -230,7 +238,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* an IllegalArgumentException if used to compare paths from different wals. * an IllegalArgumentException if used to compare paths from different wals.
*/ */
final Comparator<Path> LOG_NAME_COMPARATOR = final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
private static final class WalProps { private static final class WalProps {
@ -256,8 +264,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
* (contained in the log file name). * (contained in the log file name).
*/ */
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = new ConcurrentSkipListMap<>( protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
LOG_NAME_COMPARATOR); new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
/** /**
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
@ -311,8 +319,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// be stuck and make no progress if the buffer is filled with appends only and there is no // be stuck and make no progress if the buffer is filled with appends only and there is no
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return. // before they return.
int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", int preallocatedEventCount =
1024 * 16); this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
checkArgument(preallocatedEventCount >= 0, checkArgument(preallocatedEventCount >= 0,
"hbase.regionserver.wal.disruptor.event.count must > 0"); "hbase.regionserver.wal.disruptor.event.count must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount); int floor = Integer.highestOneBit(preallocatedEventCount);
@ -346,12 +354,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
// If prefix is null||empty then just name it wal // If prefix is null||empty then just name it wal
this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal" this.walFilePrefix =
: URLEncoder.encode(prefix, "UTF8"); prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
// we only correctly differentiate suffices when numeric ones start with '.' // we only correctly differentiate suffices when numeric ones start with '.'
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
+ "' but instead was '" + suffix + "'"); "' but instead was '" + suffix + "'");
} }
// Now that it exists, set the storage policy for the entire directory of wal files related to // Now that it exists, set the storage policy for the entire directory of wal files related to
// this FSHLog instance // this FSHLog instance
@ -398,8 +406,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// (it costs a little x'ing bocks) // (it costs a little x'ing bocks)
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir)); CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
this.logrollsize = (long) (blocksize this.logrollsize =
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
if (maxLogsDefined) { if (maxLogsDefined) {
@ -408,9 +416,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
Math.max(32, calculateMaxLogFiles(conf, logrollsize))); Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
+ StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
+ walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
this.slowSyncNs = TimeUnit.MILLISECONDS this.slowSyncNs = TimeUnit.MILLISECONDS
.toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
@ -588,8 +596,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
int logCount = getNumRolledLogFiles(); int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) { if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
regions = this.sequenceIdAccounting regions =
.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
} }
if (regions != null) { if (regions != null) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -599,8 +607,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
sb.append(Bytes.toStringBinary(regions[i])); sb.append(Bytes.toStringBinary(regions[i]));
} }
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + "; forcing flush of " LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
+ regions.length + " regions(s): " + sb.toString()); "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
} }
return regions; return regions;
} }
@ -688,8 +696,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.walFile2Props.put(oldPath, this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen); this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else { } else {
LOG.info("New WAL " + newPathString); LOG.info("New WAL " + newPathString);
} }
@ -755,8 +763,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
newPath = replaceWriter(oldPath, newPath, nextWriter); newPath = replaceWriter(oldPath, newPath, nextWriter);
tellListenersAboutPostLogRoll(oldPath, newPath); tellListenersAboutPostLogRoll(oldPath, newPath);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Create new " + implClassName + " writer with pipeline: " LOG.debug("Create new " + implClassName + " writer with pipeline: " +
+ Arrays.toString(getPipeline())); Arrays.toString(getPipeline()));
} }
// Can we delete any of the old log files? // Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) { if (getNumRolledLogFiles() > 0) {
@ -766,8 +774,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} catch (CommonFSUtils.StreamLacksCapabilityException exception) { } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
// If the underlying FileSystem can't do what we ask, treat as IO failure so // If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort. // we'll abort.
throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " + throw new IOException(
"for details.", exception); "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
} finally { } finally {
closeBarrier.endOp(); closeBarrier.endOp();
} }
@ -843,8 +852,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
} }
LOG.debug("Moved " + files.length + " WAL file(s) to " + LOG.debug(
CommonFSUtils.getPath(this.walArchiveDir)); "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
} }
LOG.info("Closed WAL: " + toString()); LOG.info("Closed WAL: " + toString());
} }
@ -979,8 +988,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
@Override @Override
public String toString() { public String toString() {
return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
+ filenum + ")";
} }
/** /**
@ -1023,8 +1031,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
protected abstract W createWriterInstance(Path path) throws IOException, protected abstract W createWriterInstance(Path path)
CommonFSUtils.StreamLacksCapabilityException; throws IOException, CommonFSUtils.StreamLacksCapabilityException;
/** /**
* @return old wal file size * @return old wal file size
@ -1034,6 +1042,27 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doShutdown() throws IOException; protected abstract void doShutdown() throws IOException;
protected abstract boolean doCheckLogLowReplication();
public void checkLogLowReplication(long checkInterval) {
long now = EnvironmentEdgeManager.currentTime();
if (now - lastTimeCheckLowReplication < checkInterval) {
return;
}
// Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) {
return;
}
try {
lastTimeCheckLowReplication = now;
if (doCheckLogLowReplication()) {
requestLogRoll(true);
}
} finally {
rollWriterLock.unlock();
}
}
/** /**
* This method gets the pipeline for the current WAL. * This method gets the pipeline for the current WAL.
*/ */
@ -1045,4 +1074,68 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/ */
@VisibleForTesting @VisibleForTesting
abstract int getLogReplication(); abstract int getLogReplication();
private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FSUtils.getWALFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
if (!fs.getFileStatus(p).isDirectory()) {
throw new IOException(p + " is not a directory");
}
final Path baseDir = FSUtils.getWALRootDir(conf);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: " +
"AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
}
/**
* Pass one or more log file names and it will either dump out a text version on
* <code>stdout</code> or split the specified log files.
*/
public static void main(String[] args) throws IOException {
if (args.length < 2) {
usage();
System.exit(-1);
}
// either dump using the WALPrettyPrinter or split, depending on args
if (args[0].compareTo("--dump") == 0) {
WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
LOG.fatal(
"\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
try {
Path logPath = new Path(args[i]);
FSUtils.setFsDefault(conf, logPath);
split(conf, logPath);
} catch (IOException t) {
t.printStackTrace(System.err);
System.exit(-1);
}
}
} else {
usage();
System.exit(-1);
}
}
} }

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Nam
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -536,7 +535,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
// we will give up consuming so if there are some unsynced data we need to issue a sync. // we will give up consuming so if there are some unsynced data we need to issue a sync.
if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() &&
syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
// no new data in the ringbuffer and we have at least one sync request // no new data in the ringbuffer and we have at least one sync request
sync(writer); sync(writer);
} }
@ -564,8 +563,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override @Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException { throws IOException {
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, long txid =
waitingConsumePayloads); stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer); consumeExecutor.execute(consumer);
} }
@ -746,4 +745,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
int getLogReplication() { int getLogReplication() {
return getPipeline().length; return getPipeline().length;
} }
@Override
protected boolean doCheckLogLowReplication() {
// not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
// typically there is no 'low replication' state, only a 'broken' state.
AsyncFSOutput output = this.fsOut;
return output != null && output.getPipeline().length == 0;
}
} }

View File

@ -25,7 +25,6 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
@ -42,25 +41,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -156,10 +148,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger(); private final AtomicInteger closeErrorCount = new AtomicInteger();
// Last time to check low replication on hlog's pipeline
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
/** /**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
* using our logger instead of java native logger. * using our logger instead of java native logger.
@ -627,14 +615,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
/** /**
* Schedule a log roll if needed. * Schedule a log roll if needed.
*/ */
public void checkLogRoll() { private void checkLogRoll() {
// Will return immediately if we are in the middle of a WAL log roll currently. // Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) { if (!rollWriterLock.tryLock()) {
return; return;
} }
boolean lowReplication; boolean lowReplication;
try { try {
lowReplication = checkLowReplication(); lowReplication = doCheckLogLowReplication();
} finally { } finally {
rollWriterLock.unlock(); rollWriterLock.unlock();
} }
@ -646,9 +634,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
/** /**
* @return true if number of replicas for the WAL is lower than threshold * @return true if number of replicas for the WAL is lower than threshold
*/ */
private boolean checkLowReplication() { protected boolean doCheckLogLowReplication() {
boolean logRollNeeded = false; boolean logRollNeeded = false;
this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
// if the number of replicas in HDFS has fallen below the configured // if the number of replicas in HDFS has fallen below the configured
// value, then roll logs. // value, then roll logs.
try { try {
@ -767,24 +754,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER
+ Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FSUtils.getWALFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
if (!fs.getFileStatus(p).isDirectory()) {
throw new IOException(p + " is not a directory");
}
final Path baseDir = FSUtils.getWALRootDir(conf);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
/** /**
* This class is used coordinating two threads holding one thread at a 'safe point' while the * This class is used coordinating two threads holding one thread at a 'safe point' while the
* orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL
@ -1118,52 +1087,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
} }
private static void usage() {
System.err.println("Usage: FSHLog <ARGS>");
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: "
+ "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
}
/**
* Pass one or more log file names and it will either dump out a text version on
* <code>stdout</code> or split the specified log files.
*/
public static void main(String[] args) throws IOException {
if (args.length < 2) {
usage();
System.exit(-1);
}
// either dump using the WALPrettyPrinter or split, depending on args
if (args[0].compareTo("--dump") == 0) {
WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
LOG.fatal(
"\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
try {
Path logPath = new Path(args[i]);
FSUtils.setFsDefault(conf, logPath);
split(conf, logPath);
} catch (IOException t) {
t.printStackTrace(System.err);
System.exit(-1);
}
}
} else {
usage();
System.exit(-1);
}
}
/** /**
* This method gets the pipeline for the current WAL. * This method gets the pipeline for the current WAL.
*/ */
@ -1176,12 +1099,4 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
return new DatanodeInfo[0]; return new DatanodeInfo[0];
} }
/**
*
* @return last time on checking low replication
*/
public long getLastTimeCheckLowReplication() {
return this.lastTimeCheckLowReplication;
}
} }

View File

@ -29,13 +29,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public abstract class ReaderBase implements AbstractFSWALProvider.Reader { public abstract class ReaderBase implements AbstractFSWALProvider.Reader {

View File

@ -18,77 +18,109 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import org.apache.commons.logging.Log; import java.io.IOException;
import org.apache.commons.logging.LogFactory; import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category(MediumTests.class) @Category({ RegionServerTests.class, LargeTests.class })
public class TestWALOpenAfterDNRollingStart { public class TestWALOpenAfterDNRollingStart {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static long DataNodeRestartInterval; // Sleep time before restart next dn, we need to wait the current dn to finish start up
private static long DN_RESTART_INTERVAL = 15000;
// interval of checking low replication. The sleep time must smaller than
// DataNodeRestartInterval
// so a low replication case will be detected and the wal will be rolled
private static long CHECK_LOW_REPLICATION_INTERVAL = 10000;
@Parameter
public String walProvider;
@Parameters(name = "{index}: wal={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
// Sleep time before restart next dn, we need to wait the current dn to finish start up // don't let hdfs client to choose a new replica when dn down
DataNodeRestartInterval = 15000; TEST_UTIL.getConfiguration()
// interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval .setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", false);
// so a low replication case will be detected and the wal will be rolled
long checkLowReplicationInterval = 10000;
//don't let hdfs client to choose a new replica when dn down
TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable",
false);
TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
checkLowReplicationInterval); CHECK_LOW_REPLICATION_INTERVAL);
TEST_UTIL.startMiniDFSCluster(3); TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniZKCluster();
}
@Before
public void setUp() throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().set("hbase.wal.provider", walProvider);
TEST_UTIL.startMiniHBaseCluster(1, 1);
}
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniHBaseCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
} }
/** /**
* see HBASE-18132 * see HBASE-18132 This is a test case of failing open a wal(for replication for example) after
* This is a test case of failing open a wal(for replication for example) after all datanode * all datanode restarted (rolling upgrade, for example). Before this patch, low replication
* restarted (rolling upgrade, for example). * detection is only used when syncing wal. But if the wal haven't had any entry whiten, it will
* Before this patch, low replication detection is only used when syncing wal. * never know all the replica of the wal is broken(because of dn restarting). And this wal can
* But if the wal haven't had any entry whiten, it will never know all the replica of the wal * never be open
* is broken(because of dn restarting). And this wal can never be open
* @throws Exception * @throws Exception
*/ */
@Test(timeout = 300000) @Test
public void test() throws Exception { public void test() throws Exception {
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
FSHLog hlog = (FSHLog)server.getWAL(null); AbstractFSWAL<?> wal = (AbstractFSWAL<?>) server.getWAL(null);
Path currentFile = hlog.getCurrentFileName(); Path currentFile = wal.getCurrentFileName();
//restart every dn to simulate a dn rolling upgrade // restart every dn to simulate a dn rolling upgrade
for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) { for (int i = 0, n = TEST_UTIL.getDFSCluster().getDataNodes().size(); i < n; i++) {
//This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from // This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from
//the dn list and then add to the tail of this list, we need to always restart the first one // the dn list and then add to the tail of this list, we need to always restart the first one
//to simulate rolling upgrade of every dn. // to simulate rolling upgrade of every dn.
TEST_UTIL.getDFSCluster().restartDataNode(0); TEST_UTIL.getDFSCluster().restartDataNode(0);
//sleep enough time so log roller can detect the pipeline break and roll log // sleep enough time so log roller can detect the pipeline break and roll log
Thread.sleep(DataNodeRestartInterval); Thread.sleep(DN_RESTART_INTERVAL);
} }
if(!server.getFileSystem().exists(currentFile)) { if (!server.getFileSystem().exists(currentFile)) {
Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration()); Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration());
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
currentFile = new Path(oldLogDir, currentFile.getName()); currentFile = new Path(oldLogDir, currentFile.getName());
} }
//if the log is not rolled, then we can never open this wal forever. // if the log is not rolled, then we can never open this wal forever.
WAL.Reader reader = WALFactory try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
.createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration()); TEST_UTIL.getConfiguration())) {
reader.next();
}
} }
} }