HBASE-19344 improve asyncWAL by using Independent thread for netty #IO in FanOutOneBlockAsyncDFSOutput

This commit is contained in:
zhangduo 2017-11-30 22:02:10 +08:00
parent cc3f804b07
commit 49a9fe4883
9 changed files with 168 additions and 170 deletions

View File

@ -969,11 +969,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
entry.stampRegionSequenceId(we); entry.stampRegionSequenceId(we);
if (scope != null) { ringBuffer.get(txid).load(entry);
ringBuffer.get(txid).load(entry, scope.getSpan());
} else {
ringBuffer.get(txid).load(entry, null);
}
} finally { } finally {
ringBuffer.publish(txid); ringBuffer.publish(txid);
} }

View File

@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -32,6 +36,9 @@ import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -44,28 +51,25 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor; import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
/** /**
* An asynchronous implementation of FSWAL. * An asynchronous implementation of FSWAL.
@ -81,7 +85,7 @@ import com.lmax.disruptor.Sequencer;
* </li> * </li>
* </ol> * </ol>
* </li> * </li>
* <li>In the consumer task(in the EventLoop thread) * <li>In the consumer task(executed in a single threaded thread pool)
* <ol> * <ol>
* <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
* {@link #toWriteAppends}</li> * {@link #toWriteAppends}</li>
@ -117,14 +121,12 @@ import com.lmax.disruptor.Sequencer;
* signal the {@link #readyForRollingCond}.</li> * signal the {@link #readyForRollingCond}.</li>
* <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
* we reach a safe point. So it is safe to replace old writer with new writer now.</li> * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
* <li>Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log roller exit checker * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
* if any(see the comments in the {@link #syncFailed(Throwable)} method to see why we need a checker
* here).</li>
* <li>Schedule the consumer task.</li> * <li>Schedule the consumer task.</li>
* <li>Schedule a background task to close the old writer.</li> * <li>Schedule a background task to close the old writer.</li>
* </ol> * </ol>
* For a broken writer roll request, the only difference is that we can bypass the wait for safe * For a broken writer roll request, the only difference is that we can bypass the wait for safe
* point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details. * point stage.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@ -142,7 +144,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
private final EventLoop eventLoop; public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
"hbase.wal.async.use-shared-event-loop";
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = true;
private final EventLoopGroup eventLoopGroup;
private final ExecutorService consumeExecutor;
private final Class<? extends Channel> channelClass; private final Class<? extends Channel> channelClass;
@ -153,8 +161,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// check if there is already a consumer task in the event loop's task queue // check if there is already a consumer task in the event loop's task queue
private final Supplier<Boolean> hasConsumerTask; private final Supplier<Boolean> hasConsumerTask;
// new writer is created and we are waiting for old writer to be closed. private static final int MAX_EPOCH = 0x3FFFFFFF;
private volatile boolean waitingRoll; // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
// writer to be closed.
// the second lowest bit is writerBorken which means the current writer is broken and rollWriter
// is needed.
// all other bits are the epoch number of the current writer, this is used to detect whether the
// writer is still the one when you issue the sync.
// notice that, modification to this field is only allowed under the protection of consumeLock.
private volatile int epochAndState;
// used to guard the log roll request when we exceed the log roll size.
private boolean rollRequested;
private boolean readyForRolling; private boolean readyForRolling;
@ -166,9 +184,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
// writer is broken and rollWriter is needed.
private volatile boolean writerBroken;
private final long batchSize; private final long batchSize;
private final int createMaxRetries; private final int createMaxRetries;
@ -194,30 +209,39 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass) String prefix, String suffix, EventLoopGroup eventLoopGroup,
throws FailedLogCloseException, IOException { Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop; this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass; this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask; Supplier<Boolean> hasConsumerTask;
if (eventLoop instanceof SingleThreadEventExecutor) { if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
this.consumeExecutor = eventLoopGroup.next();
if (consumeExecutor instanceof SingleThreadEventExecutor) {
try { try {
Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
field.setAccessible(true); field.setAccessible(true);
Queue<?> queue = (Queue<?>) field.get(eventLoop); Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
hasConsumerTask = () -> queue.peek() == consumer; hasConsumerTask = () -> queue.peek() == consumer;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up", LOG.warn("Can not get task queue of " + consumeExecutor +
e); ", this is not necessary, just give up", e);
hasConsumerTask = () -> false; hasConsumerTask = () -> false;
} }
} else { } else {
hasConsumerTask = () -> false; hasConsumerTask = () -> false;
} }
} else {
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build());
hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
this.consumeExecutor = threadPool;
}
this.hasConsumerTask = hasConsumerTask; this.hasConsumerTask = hasConsumerTask;
int preallocatedEventCount = int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
waitingConsumePayloads = waitingConsumePayloads =
RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@ -233,19 +257,31 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
rollWriter(); rollWriter();
} }
private static boolean waitingRoll(int epochAndState) {
return (epochAndState & 1) != 0;
}
private static boolean writerBroken(int epochAndState) {
return ((epochAndState >>> 1) & 1) != 0;
}
private static int epoch(int epochAndState) {
return epochAndState >>> 2;
}
// return whether we have successfully set readyForRolling to true. // return whether we have successfully set readyForRolling to true.
private boolean trySetReadyForRolling() { private boolean trySetReadyForRolling() {
// Check without holding lock first. Usually we will just return here. // Check without holding lock first. Usually we will just return here.
// waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
// check them outside the consumeLock. // check them outside the consumeLock.
if (!waitingRoll || !unackedAppends.isEmpty()) { if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
return false; return false;
} }
consumeLock.lock(); consumeLock.lock();
try { try {
// 1. a roll is requested // 1. a roll is requested
// 2. all out-going entries have been acked(we have confirmed above). // 2. all out-going entries have been acked(we have confirmed above).
if (waitingRoll) { if (waitingRoll(epochAndState)) {
readyForRolling = true; readyForRolling = true;
readyForRollingCond.signalAll(); readyForRollingCond.signalAll();
return true; return true;
@ -257,26 +293,25 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
private void syncFailed(Throwable error) { private void syncFailed(long epochWhenSync, Throwable error) {
LOG.warn("sync failed", error); LOG.warn("sync failed", error);
// Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty. boolean shouldRequestLogRoll = true;
// When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
// is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
// directly if it is already in the EventLoop thread. And in the listener method, it will
// call us. So here we know that all failed flush request will call us continuously, and
// before the last one finish, no other task can be executed in EventLoop. So here we are
// safe to use writerBroken as a guard.
// Do not forget to revisit this if we change the implementation of
// FanOutOneBlockAsyncDFSOutput!
consumeLock.lock(); consumeLock.lock();
try { try {
if (writerBroken) { int currentEpochAndState = epochAndState;
if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
// this is not the previous writer which means we have already rolled the writer.
// or this is still the current writer, but we have already marked it as broken and request
// a roll.
return; return;
} }
writerBroken = true; this.epochAndState = currentEpochAndState | 0b10;
if (waitingRoll) { if (waitingRoll(currentEpochAndState)) {
readyForRolling = true; readyForRolling = true;
readyForRollingCond.signalAll(); readyForRollingCond.signalAll();
// this means we have already in the middle of a rollWriter so just tell the roller thread
// that you can continue without requesting an extra log roll.
shouldRequestLogRoll = false;
} }
} finally { } finally {
consumeLock.unlock(); consumeLock.unlock();
@ -285,9 +320,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
toWriteAppends.addFirst(iter.next()); toWriteAppends.addFirst(iter.next());
} }
highestUnsyncedTxid = highestSyncedTxid.get(); highestUnsyncedTxid = highestSyncedTxid.get();
if (shouldRequestLogRoll) {
// request a roll. // request a roll.
requestLogRoll(); requestLogRoll();
} }
}
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid); highestSyncedTxid.set(processedTxid);
@ -299,30 +336,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
postSync(System.nanoTime() - startTimeNs, finishSync(true)); postSync(System.nanoTime() - startTimeNs, finishSync(true));
// Ideally, we should set a flag to indicate that the log roll has already been requested for
// the current writer and give up here, and reset the flag when roll is finished. But we
// finish roll in the log roller thread so the flag need to be set by different thread which
// typically means we need to use a lock to protect it and do fencing. As the log roller will
// aggregate the roll requests of the same WAL, so it is safe to call requestLogRoll multiple
// times before the roll actual happens. But we need to stop if we set readyForRolling to true
// and wake up the log roller thread waiting in waitForSafePoint as the rollWriter call may
// return firstly and then we run the code below and request a roll on the new writer.
if (trySetReadyForRolling()) { if (trySetReadyForRolling()) {
// we have just finished a roll, then do not need to check for log rolling, the writer will be // we have just finished a roll, then do not need to check for log rolling, the writer will be
// closed soon. // closed soon.
return; return;
} }
if (writer.getLength() < logrollsize) { if (writer.getLength() < logrollsize || rollRequested) {
return; return;
} }
if (!rollWriterLock.tryLock()) { rollRequested = true;
return;
}
try {
requestLogRoll(); requestLogRoll();
} finally {
rollWriterLock.unlock();
}
} }
private void sync(AsyncWriter writer) { private void sync(AsyncWriter writer) {
@ -330,13 +353,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime(); final long startTimeNs = System.nanoTime();
writer.sync().whenComplete((result, error) -> { final long epoch = epochAndState >>> 2;
writer.sync().whenCompleteAsync((result, error) -> {
if (error != null) { if (error != null) {
syncFailed(error); syncFailed(epoch, error);
} else { } else {
syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
} }
}); }, consumeExecutor);
} }
private void addTimeAnnotation(SyncFuture future, String annotation) { private void addTimeAnnotation(SyncFuture future, String annotation) {
@ -410,26 +434,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next(); FSWALEntry entry = iter.next();
boolean appended; boolean appended;
Span span = entry.detachSpan();
// the span maybe null if this is a retry after rolling.
if (span != null) {
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(span);
try { try {
appended = append(writer, entry); appended = append(writer, entry);
} catch (IOException e) { } catch (IOException e) {
throw new AssertionError("should not happen", e); throw new AssertionError("should not happen", e);
} finally {
//TODO handle htrace API change, see HBASE-18895
//assert scope == NullScope.INSTANCE || !scope.isDetached();
//scope.close(); // append scope is complete
}
} else {
try {
appended = append(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
} }
newHighestProcessedAppendTxid = entry.getTxid(); newHighestProcessedAppendTxid = entry.getTxid();
iter.remove(); iter.remove();
@ -472,10 +480,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private void consume() { private void consume() {
consumeLock.lock(); consumeLock.lock();
try { try {
if (writerBroken) { int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState)) {
return; return;
} }
if (waitingRoll) { if (waitingRoll(currentEpochAndState)) {
if (writer.getLength() > fileLengthAtLastSync) { if (writer.getLength() > fileLengthAtLastSync) {
// issue a sync // issue a sync
sync(writer); sync(writer);
@ -491,8 +500,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
consumeLock.unlock(); consumeLock.unlock();
} }
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
for (long cursorBound = for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { nextCursor++) {
if (!waitingConsumePayloads.isPublished(nextCursor)) { if (!waitingConsumePayloads.isPublished(nextCursor)) {
break; break;
} }
@ -540,11 +549,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
// reschedule if we still have something to write. // reschedule if we still have something to write.
eventLoop.execute(consumer); consumeExecutor.execute(consumer);
} }
private boolean shouldScheduleConsumer() { private boolean shouldScheduleConsumer() {
if (writerBroken || waitingRoll) { int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
return false; return false;
} }
return consumerScheduled.compareAndSet(false, true); return consumerScheduled.compareAndSet(false, true);
@ -556,7 +566,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long txid = long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
eventLoop.execute(consumer); consumeExecutor.execute(consumer);
} }
return txid; return txid;
} }
@ -574,7 +584,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
waitingConsumePayloads.publish(txid); waitingConsumePayloads.publish(txid);
} }
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
eventLoop.execute(consumer); consumeExecutor.execute(consumer);
} }
blockOnSync(future); blockOnSync(future);
} }
@ -597,7 +607,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
waitingConsumePayloads.publish(sequence); waitingConsumePayloads.publish(sequence);
} }
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
eventLoop.execute(consumer); consumeExecutor.execute(consumer);
} }
blockOnSync(future); blockOnSync(future);
} }
@ -608,7 +618,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
boolean overwrite = false; boolean overwrite = false;
for (int retry = 0;; retry++) { for (int retry = 0;; retry++) {
try { try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop, return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup,
channelClass); channelClass);
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
@ -643,20 +653,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
} }
throw new IOException("Failed to create wal log writer " + path + " after retrying " throw new IOException("Failed to create wal log writer " + path + " after retrying " +
+ createMaxRetries + " time(s)"); createMaxRetries + " time(s)");
} }
private void waitForSafePoint() { private void waitForSafePoint() {
consumeLock.lock(); consumeLock.lock();
try { try {
if (writerBroken || this.writer == null) { int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState) || this.writer == null) {
return; return;
} }
consumerScheduled.set(true); consumerScheduled.set(true);
waitingRoll = true; epochAndState = currentEpochAndState | 1;
readyForRolling = false; readyForRolling = false;
eventLoop.execute(consumer); consumeExecutor.execute(consumer);
while (!readyForRolling) { while (!readyForRolling) {
readyForRollingCond.awaitUninterruptibly(); readyForRollingCond.awaitUninterruptibly();
} }
@ -674,13 +685,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
} }
this.fileLengthAtLastSync = 0L; this.fileLengthAtLastSync = nextWriter.getLength();
this.rollRequested = false;
this.highestProcessedAppendTxidAtLastSync = 0L; this.highestProcessedAppendTxidAtLastSync = 0L;
consumeLock.lock(); consumeLock.lock();
try { try {
consumerScheduled.set(true); consumerScheduled.set(true);
writerBroken = waitingRoll = false; int currentEpoch = epochAndState >>> 2;
eventLoop.execute(consumer); int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
// set a new epoch and also clear waitingRoll and writerBroken
this.epochAndState = nextEpoch << 2;
consumeExecutor.execute(consumer);
} finally { } finally {
consumeLock.unlock(); consumeLock.unlock();
} }
@ -710,6 +725,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
closeExecutor.shutdown(); closeExecutor.shutdown();
IOException error = new IOException("WAL has been closed"); IOException error = new IOException("WAL has been closed");
syncFutures.forEach(f -> f.done(f.getTxid(), error)); syncFutures.forEach(f -> f.done(f.getTxid(), error));
if (!(consumeExecutor instanceof EventLoop)) {
consumeExecutor.shutdown();
}
} }
@Override @Override

View File

@ -17,11 +17,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -35,15 +30,19 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/** /**
* AsyncWriter for protobuf-based WAL. * AsyncWriter for protobuf-based WAL.
@ -54,7 +53,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
private final EventLoop eventLoop; private final EventLoopGroup eventLoopGroup;
private final Class<? extends Channel> channelClass; private final Class<? extends Channel> channelClass;
@ -103,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private OutputStream asyncOutputWrapper; private OutputStream asyncOutputWrapper;
public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
this.eventLoop = eventLoop; Class<? extends Channel> channelClass) {
this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass; this.channelClass = channelClass;
} }
@ -156,13 +156,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException { short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop, channelClass); blockSize, eventLoopGroup, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }
private long write(Consumer<CompletableFuture<Long>> action) throws IOException { private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
eventLoop.execute(() -> action.accept(future)); action.accept(future);
try { try {
return future.get().longValue(); return future.get().longValue();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.htrace.core.Span;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -58,9 +57,6 @@ class FSWALEntry extends Entry {
private final transient RegionInfo regionInfo; private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames; private final transient Set<byte[]> familyNames;
// The tracing span for this entry when writing WAL.
private transient Span span;
FSWALEntry(final long txid, final WALKey key, final WALEdit edit, FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) { final RegionInfo regionInfo, final boolean inMemstore) {
super(key, edit); super(key, edit);
@ -130,14 +126,4 @@ class FSWALEntry extends Entry {
Set<byte[]> getFamilyNames() { Set<byte[]> getFamilyNames() {
return familyNames; return familyNames;
} }
void attachSpan(Span span) {
this.span = span;
}
Span detachSpan() {
Span span = this.span;
this.span = null;
return span;
}
} }

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.htrace.core.Span;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -43,10 +42,9 @@ final class RingBufferTruck {
private FSWALEntry entry; private FSWALEntry entry;
/** /**
* Load the truck with a {@link FSWALEntry} and associated {@link Span}. * Load the truck with a {@link FSWALEntry}.
*/ */
void load(FSWALEntry entry, Span span) { void load(FSWALEntry entry) {
entry.attachSpan(span);
this.entry = entry; this.entry = entry;
this.type = Type.APPEND; this.type = Type.APPEND;
} }

View File

@ -21,20 +21,21 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor; import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter { public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null; private Encryptor encryptor = null;
public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { public SecureAsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
super(eventLoop, channelClass); Class<? extends Channel> channelClass) {
super(eventLoopGroup, channelClass);
} }
@Override @Override

View File

@ -34,7 +34,6 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@ -69,7 +68,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
getWALDirectoryName(factory.factoryId), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next(), channelClass); eventLoopGroup, channelClass);
} }
@Override @Override
@ -90,14 +89,14 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
* public because of AsyncFSWAL. Should be package-private * public because of AsyncFSWAL. Should be package-private
*/ */
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass) boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws IOException { throws IOException {
// Configuration already does caching for the Class lookup. // Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass( Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
try { try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class) AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
.newInstance(eventLoop, channelClass); .newInstance(eventLoopGroup, channelClass);
writer.init(fs, path, conf, overwritable); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {

View File

@ -63,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException { String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next(), CHANNEL_CLASS); suffix, GROUP, CHANNEL_CLASS);
} }
@Override @Override
@ -72,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
boolean failIfWALExists, String prefix, String suffix, final Runnable action) boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException { throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next(), CHANNEL_CLASS) { suffix, GROUP, CHANNEL_CLASS) {
@Override @Override
void atHeadOfRingBufferEventHandlerAppend() { void atHeadOfRingBufferEventHandlerAppend() {

View File

@ -62,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override @Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS); HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS);
} }
} }