HBASE-17085 AsyncFSWAL may issue unnecessary AsyncDFSOutput.sync

This commit is contained in:
zhangduo 2016-11-17 14:54:25 +08:00
parent 80acc2dca5
commit ab55bad899
4 changed files with 70 additions and 90 deletions

View File

@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
@ -163,9 +165,9 @@ public abstract class AbstractFSWAL<W> implements WAL {
/** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
protected final DrainBarrier closeBarrier = new DrainBarrier();
protected final int slowSyncNs;
protected final long slowSyncNs;
private final long walSyncTimeout;
private final long walSyncTimeoutNs;
// If > than this size, roll the log.
protected final long logrollsize;
@ -221,12 +223,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
* an IllegalArgumentException if used to compare paths from different wals.
*/
final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
@Override
public int compare(Path o1, Path o2) {
return Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
}
};
final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
private static final class WalProps {
@ -258,7 +256,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
/**
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
* <p>
* TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here.
* TODO: Reuse FSWALEntry's rather than create them anew each time as we do SyncFutures here.
* <p>
* TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get
* them from this Map?
@ -400,10 +398,10 @@ public abstract class AbstractFSWAL<W> implements WAL {
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
+ StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
+ walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
this.slowSyncNs =
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS);
this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
DEFAULT_WAL_SYNC_TIMEOUT_MS);
this.slowSyncNs = TimeUnit.MILLISECONDS
.toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
// Presize our map of SyncFutures by handler objects.
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
@ -682,7 +680,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
protected Span blockOnSync(final SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
try {
syncFuture.get(walSyncTimeout);
syncFuture.get(walSyncTimeoutNs);
return syncFuture.getSpan();
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
@ -840,15 +838,10 @@ public abstract class AbstractFSWAL<W> implements WAL {
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
}
protected SyncFuture getSyncFuture(final long sequence, Span span) {
SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
if (syncFuture == null) {
syncFuture = new SyncFuture(sequence, span);
this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
} else {
syncFuture.reset(sequence, span);
}
return syncFuture;
protected SyncFuture getSyncFuture(long sequence, Span span) {
return CollectionUtils
.computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new)
.reset(sequence, span);
}
protected void requestLogRoll(boolean tooFewReplicas) {

View File

@ -35,8 +35,9 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@ -109,8 +110,8 @@ import org.apache.htrace.TraceScope;
* {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
* {@link #waitForSafePoint()}).</li>
* <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
* {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends}
* out.</li>
* {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
* </li>
* <li>If there are unflush data in the writer, sync them.</li>
* <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
* signal the {@link #readyForRollingCond}.</li>
@ -179,8 +180,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
private final PriorityQueue<SyncFuture> syncFutures =
new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
private final SortedSet<SyncFuture> syncFutures = new TreeSet<SyncFuture>(SEQ_COMPARATOR);
// the highest txid of WAL entries being processed
private long highestProcessedAppendTxid;
@ -188,6 +188,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// file length when we issue last sync request on the writer
private long fileLengthAtLastSync;
private long highestProcessedAppendTxidAtLastSync;
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop)
@ -286,7 +288,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
int syncCount = finishSync(true);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
iter.remove();
@ -294,7 +295,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
break;
}
}
postSync(System.nanoTime() - startTimeNs, syncCount);
postSync(System.nanoTime() - startTimeNs, finishSync(true));
// Ideally, we should set a flag to indicate that the log roll has already been requested for
// the current writer and give up here, and reset the flag when roll is finished. But we
// finish roll in the log roller thread so the flag need to be set by different thread which
@ -321,14 +322,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private void sync(final AsyncWriter writer, final long processedTxid) {
private void sync(AsyncWriter writer) {
fileLengthAtLastSync = writer.getLength();
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
writer.sync().whenComplete((result, error) -> {
if (error != null) {
syncFailed(error);
} else {
syncCompleted(writer, processedTxid, startTimeNs);
syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
}
});
}
@ -341,10 +344,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
int finished = 0;
for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
sync.done(txid, null);
syncFutures.remove();
iter.remove();
finished++;
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
@ -356,12 +360,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
return finished;
}
// try advancing the highestSyncedTxid as much as possible
private int finishSync(boolean addSyncTrace) {
long doneTxid = highestSyncedTxid.get();
if (doneTxid >= highestProcessedAppendTxid) {
if (unackedAppends.isEmpty()) {
// All outstanding appends have been acked.
if (toWriteAppends.isEmpty()) {
// all outstanding appends have been acked, just finish all syncs.
long maxSyncTxid = doneTxid;
// Also no appends that wait to be written out, then just finished all pending syncs.
long maxSyncTxid = highestSyncedTxid.get();
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
sync.done(maxSyncTxid, null);
@ -379,10 +384,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
highestSyncedTxid.set(lowestUnprocessedAppendTxid - 1);
return finishSyncLowerThanTxid(lowestUnprocessedAppendTxid - 1, addSyncTrace);
long doneTxid = lowestUnprocessedAppendTxid - 1;
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
}
} else {
// There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
// first unacked append minus 1.
long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
}
}
@ -392,7 +403,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
// finish some.
finishSync(false);
long newHighestProcessedTxid = -1L;
long newHighestProcessedAppendTxid = -1L;
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
boolean appended;
@ -415,7 +426,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
throw new AssertionError("should not happen", e);
}
}
newHighestProcessedTxid = entry.getTxid();
newHighestProcessedAppendTxid = entry.getTxid();
iter.remove();
if (appended) {
unackedAppends.addLast(entry);
@ -426,42 +437,32 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
// if we have a newer transaction id, update it.
// otherwise, use the previous transaction id.
if (newHighestProcessedTxid > 0) {
highestProcessedAppendTxid = newHighestProcessedTxid;
if (newHighestProcessedAppendTxid > 0) {
highestProcessedAppendTxid = newHighestProcessedAppendTxid;
} else {
newHighestProcessedTxid = highestProcessedAppendTxid;
newHighestProcessedAppendTxid = highestProcessedAppendTxid;
}
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
// sync because buffer size limit.
sync(writer, newHighestProcessedTxid);
sync(writer);
return;
}
if (writer.getLength() == fileLengthAtLastSync) {
// we haven't written anything out, just advance the highestSyncedSequence since we may only
// stamped some region sequence id.
highestSyncedTxid.set(newHighestProcessedTxid);
finishSync(false);
trySetReadyForRolling();
if (unackedAppends.isEmpty()) {
highestSyncedTxid.set(highestProcessedAppendTxid);
finishSync(false);
trySetReadyForRolling();
}
return;
}
// we have some unsynced data but haven't reached the batch size yet
if (!syncFutures.isEmpty()) {
if (!syncFutures.isEmpty()
&& syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
// we have at least one sync request
sync(writer, newHighestProcessedTxid);
return;
}
// usually waitingRoll is false so we check it without lock first.
if (waitingRoll) {
consumeLock.lock();
try {
if (waitingRoll) {
// there is a roll request
sync(writer, newHighestProcessedTxid);
}
} finally {
consumeLock.unlock();
}
sync(writer);
}
}
@ -474,7 +475,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (waitingRoll) {
if (writer.getLength() > fileLengthAtLastSync) {
// issue a sync
sync(writer, highestProcessedAppendTxid);
sync(writer);
} else {
if (unackedAppends.isEmpty()) {
readyForRolling = true;
@ -673,6 +674,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = 0L;
this.highestProcessedAppendTxidAtLastSync = 0L;
consumeLock.lock();
try {
consumerScheduled.set(true);

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Span;
/**
@ -73,13 +73,6 @@ class SyncFuture {
*/
private Span span;
SyncFuture(long txid, Span span) {
this.t = Thread.currentThread();
this.txid = txid;
this.span = span;
this.doneTxid = NOT_DONE;
}
/**
* Call this method to clear old usage and get it ready for new deploy.
* @param txid the new transaction id
@ -157,15 +150,15 @@ class SyncFuture {
throw new UnsupportedOperationException();
}
synchronized long get(long timeout) throws InterruptedException,
synchronized long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = EnvironmentEdgeManager.currentTime() + timeout;
final long done = System.nanoTime() + timeoutNs;
while (!isDone()) {
wait(1000);
if (EnvironmentEdgeManager.currentTime() >= done) {
throw new TimeoutIOException("Failed to get sync result after "
+ timeout + " ms for txid=" + this.txid
+ ", WAL system stuck?");
if (System.nanoTime() >= done) {
throw new TimeoutIOException(
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ " ms for txid=" + this.txid + ", WAL system stuck?");
}
}
if (this.throwable != null) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -29,21 +28,14 @@ import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestSyncFuture {
@Test(timeout = 60000)
@Test(expected = TimeoutIOException.class)
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture(txid, null);
SyncFuture syncFulture = new SyncFuture().reset(txid, null);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid, null);
try {
syncFulture.get(timeout);
fail("Should have timed out but not");
} catch (TimeoutIOException e) {
// test passed
}
syncFulture.reset(txid, null).get(timeout);
}
}