HBASE-19344 improve asyncWAL by using Independent thread for netty #IO in FanOutOneBlockAsyncDFSOutput

This commit is contained in:
zhangduo 2017-11-30 22:02:10 +08:00
parent 2bda22a64e
commit df3668818d
9 changed files with 168 additions and 170 deletions

View File

@ -969,11 +969,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
entry.stampRegionSequenceId(we);
if (scope != null) {
ringBuffer.get(txid).load(entry, scope.getSpan());
} else {
ringBuffer.get(txid).load(entry, null);
}
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}

View File

@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
@ -32,6 +36,9 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -44,28 +51,25 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
/**
* An asynchronous implementation of FSWAL.
@ -81,7 +85,7 @@ import com.lmax.disruptor.Sequencer;
* </li>
* </ol>
* </li>
* <li>In the consumer task(in the EventLoop thread)
* <li>In the consumer task(executed in a single threaded thread pool)
* <ol>
* <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
* {@link #toWriteAppends}</li>
@ -117,14 +121,12 @@ import com.lmax.disruptor.Sequencer;
* signal the {@link #readyForRollingCond}.</li>
* <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
* we reach a safe point. So it is safe to replace old writer with new writer now.</li>
* <li>Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log roller exit checker
* if any(see the comments in the {@link #syncFailed(Throwable)} method to see why we need a checker
* here).</li>
* <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
* <li>Schedule the consumer task.</li>
* <li>Schedule a background task to close the old writer.</li>
* </ol>
* For a broken writer roll request, the only difference is that we can bypass the wait for safe
* point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details.
* point stage.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@ -142,7 +144,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
private final EventLoop eventLoop;
public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
"hbase.wal.async.use-shared-event-loop";
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = true;
private final EventLoopGroup eventLoopGroup;
private final ExecutorService consumeExecutor;
private final Class<? extends Channel> channelClass;
@ -153,8 +161,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// check if there is already a consumer task in the event loop's task queue
private final Supplier<Boolean> hasConsumerTask;
// new writer is created and we are waiting for old writer to be closed.
private volatile boolean waitingRoll;
private static final int MAX_EPOCH = 0x3FFFFFFF;
// the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
// writer to be closed.
// the second lowest bit is writerBorken which means the current writer is broken and rollWriter
// is needed.
// all other bits are the epoch number of the current writer, this is used to detect whether the
// writer is still the one when you issue the sync.
// notice that, modification to this field is only allowed under the protection of consumeLock.
private volatile int epochAndState;
// used to guard the log roll request when we exceed the log roll size.
private boolean rollRequested;
private boolean readyForRolling;
@ -166,9 +184,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
// writer is broken and rollWriter is needed.
private volatile boolean writerBroken;
private final long batchSize;
private final int createMaxRetries;
@ -194,32 +209,41 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop;
this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask;
if (eventLoop instanceof SingleThreadEventExecutor) {
try {
Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
field.setAccessible(true);
Queue<?> queue = (Queue<?>) field.get(eventLoop);
hasConsumerTask = () -> queue.peek() == consumer;
} catch (Exception e) {
LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up",
e);
if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
this.consumeExecutor = eventLoopGroup.next();
if (consumeExecutor instanceof SingleThreadEventExecutor) {
try {
Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
field.setAccessible(true);
Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
hasConsumerTask = () -> queue.peek() == consumer;
} catch (Exception e) {
LOG.warn("Can not get task queue of " + consumeExecutor +
", this is not necessary, just give up", e);
hasConsumerTask = () -> false;
}
} else {
hasConsumerTask = () -> false;
}
} else {
hasConsumerTask = () -> false;
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build());
hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
this.consumeExecutor = threadPool;
}
this.hasConsumerTask = hasConsumerTask;
int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
waitingConsumePayloads =
RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
@ -229,23 +253,35 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
createMaxRetries =
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
rollWriter();
}
private static boolean waitingRoll(int epochAndState) {
return (epochAndState & 1) != 0;
}
private static boolean writerBroken(int epochAndState) {
return ((epochAndState >>> 1) & 1) != 0;
}
private static int epoch(int epochAndState) {
return epochAndState >>> 2;
}
// return whether we have successfully set readyForRolling to true.
private boolean trySetReadyForRolling() {
// Check without holding lock first. Usually we will just return here.
// waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
// check them outside the consumeLock.
if (!waitingRoll || !unackedAppends.isEmpty()) {
if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
return false;
}
consumeLock.lock();
try {
// 1. a roll is requested
// 2. all out-going entries have been acked(we have confirmed above).
if (waitingRoll) {
if (waitingRoll(epochAndState)) {
readyForRolling = true;
readyForRollingCond.signalAll();
return true;
@ -257,26 +293,25 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private void syncFailed(Throwable error) {
private void syncFailed(long epochWhenSync, Throwable error) {
LOG.warn("sync failed", error);
// Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
// When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
// is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
// directly if it is already in the EventLoop thread. And in the listener method, it will
// call us. So here we know that all failed flush request will call us continuously, and
// before the last one finish, no other task can be executed in EventLoop. So here we are
// safe to use writerBroken as a guard.
// Do not forget to revisit this if we change the implementation of
// FanOutOneBlockAsyncDFSOutput!
boolean shouldRequestLogRoll = true;
consumeLock.lock();
try {
if (writerBroken) {
int currentEpochAndState = epochAndState;
if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
// this is not the previous writer which means we have already rolled the writer.
// or this is still the current writer, but we have already marked it as broken and request
// a roll.
return;
}
writerBroken = true;
if (waitingRoll) {
this.epochAndState = currentEpochAndState | 0b10;
if (waitingRoll(currentEpochAndState)) {
readyForRolling = true;
readyForRollingCond.signalAll();
// this means we have already in the middle of a rollWriter so just tell the roller thread
// that you can continue without requesting an extra log roll.
shouldRequestLogRoll = false;
}
} finally {
consumeLock.unlock();
@ -285,8 +320,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
toWriteAppends.addFirst(iter.next());
}
highestUnsyncedTxid = highestSyncedTxid.get();
// request a roll.
requestLogRoll();
if (shouldRequestLogRoll) {
// request a roll.
requestLogRoll();
}
}
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
@ -299,30 +336,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
postSync(System.nanoTime() - startTimeNs, finishSync(true));
// Ideally, we should set a flag to indicate that the log roll has already been requested for
// the current writer and give up here, and reset the flag when roll is finished. But we
// finish roll in the log roller thread so the flag need to be set by different thread which
// typically means we need to use a lock to protect it and do fencing. As the log roller will
// aggregate the roll requests of the same WAL, so it is safe to call requestLogRoll multiple
// times before the roll actual happens. But we need to stop if we set readyForRolling to true
// and wake up the log roller thread waiting in waitForSafePoint as the rollWriter call may
// return firstly and then we run the code below and request a roll on the new writer.
if (trySetReadyForRolling()) {
// we have just finished a roll, then do not need to check for log rolling, the writer will be
// closed soon.
return;
}
if (writer.getLength() < logrollsize) {
if (writer.getLength() < logrollsize || rollRequested) {
return;
}
if (!rollWriterLock.tryLock()) {
return;
}
try {
requestLogRoll();
} finally {
rollWriterLock.unlock();
}
rollRequested = true;
requestLogRoll();
}
private void sync(AsyncWriter writer) {
@ -330,19 +353,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
writer.sync().whenComplete((result, error) -> {
final long epoch = epochAndState >>> 2;
writer.sync().whenCompleteAsync((result, error) -> {
if (error != null) {
syncFailed(error);
syncFailed(epoch, error);
} else {
syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
}
});
}, consumeExecutor);
}
private void addTimeAnnotation(SyncFuture future, String annotation) {
TraceUtil.addTimelineAnnotation(annotation);
//TODO handle htrace API change, see HBASE-18895
//future.setSpan(scope.getSpan());
// TODO handle htrace API change, see HBASE-18895
// future.setSpan(scope.getSpan());
}
private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
@ -410,26 +434,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
boolean appended;
Span span = entry.detachSpan();
// the span maybe null if this is a retry after rolling.
if (span != null) {
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(span);
try {
appended = append(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
} finally {
//TODO handle htrace API change, see HBASE-18895
//assert scope == NullScope.INSTANCE || !scope.isDetached();
//scope.close(); // append scope is complete
}
} else {
try {
appended = append(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
try {
appended = append(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
newHighestProcessedAppendTxid = entry.getTxid();
iter.remove();
@ -472,10 +480,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private void consume() {
consumeLock.lock();
try {
if (writerBroken) {
int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState)) {
return;
}
if (waitingRoll) {
if (waitingRoll(currentEpochAndState)) {
if (writer.getLength() > fileLengthAtLastSync) {
// issue a sync
sync(writer);
@ -491,8 +500,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
consumeLock.unlock();
}
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
for (long cursorBound =
waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
nextCursor++) {
if (!waitingConsumePayloads.isPublished(nextCursor)) {
break;
}
@ -540,11 +549,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
// reschedule if we still have something to write.
eventLoop.execute(consumer);
consumeExecutor.execute(consumer);
}
private boolean shouldScheduleConsumer() {
if (writerBroken || waitingRoll) {
int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
return false;
}
return consumerScheduled.compareAndSet(false, true);
@ -554,16 +564,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public long append(RegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore)
throws IOException {
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) {
eventLoop.execute(consumer);
consumeExecutor.execute(consumer);
}
return txid;
}
@Override
public void sync() throws IOException {
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")){
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
@ -574,7 +584,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
waitingConsumePayloads.publish(txid);
}
if (shouldScheduleConsumer()) {
eventLoop.execute(consumer);
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}
@ -597,7 +607,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
waitingConsumePayloads.publish(sequence);
}
if (shouldScheduleConsumer()) {
eventLoop.execute(consumer);
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}
@ -608,7 +618,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
boolean overwrite = false;
for (int retry = 0;; retry++) {
try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop,
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup,
channelClass);
} catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
@ -643,20 +653,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
}
throw new IOException("Failed to create wal log writer " + path + " after retrying "
+ createMaxRetries + " time(s)");
throw new IOException("Failed to create wal log writer " + path + " after retrying " +
createMaxRetries + " time(s)");
}
private void waitForSafePoint() {
consumeLock.lock();
try {
if (writerBroken || this.writer == null) {
int currentEpochAndState = epochAndState;
if (writerBroken(currentEpochAndState) || this.writer == null) {
return;
}
consumerScheduled.set(true);
waitingRoll = true;
epochAndState = currentEpochAndState | 1;
readyForRolling = false;
eventLoop.execute(consumer);
consumeExecutor.execute(consumer);
while (!readyForRolling) {
readyForRollingCond.awaitUninterruptibly();
}
@ -674,13 +685,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = 0L;
this.fileLengthAtLastSync = nextWriter.getLength();
this.rollRequested = false;
this.highestProcessedAppendTxidAtLastSync = 0L;
consumeLock.lock();
try {
consumerScheduled.set(true);
writerBroken = waitingRoll = false;
eventLoop.execute(consumer);
int currentEpoch = epochAndState >>> 2;
int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
// set a new epoch and also clear waitingRoll and writerBroken
this.epochAndState = nextEpoch << 2;
consumeExecutor.execute(consumer);
} finally {
consumeLock.unlock();
}
@ -710,6 +725,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
closeExecutor.shutdown();
IOException error = new IOException("WAL has been closed");
syncFutures.forEach(f -> f.done(f.getTxid(), error));
if (!(consumeExecutor instanceof EventLoop)) {
consumeExecutor.shutdown();
}
}
@Override

View File

@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
@ -35,15 +30,19 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/**
* AsyncWriter for protobuf-based WAL.
@ -54,7 +53,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
private final EventLoop eventLoop;
private final EventLoopGroup eventLoopGroup;
private final Class<? extends Channel> channelClass;
@ -103,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private OutputStream asyncOutputWrapper;
public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
this.eventLoop = eventLoop;
public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass;
}
@ -156,13 +156,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop, channelClass);
blockSize, eventLoopGroup, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>();
eventLoop.execute(() -> action.accept(future));
action.accept(future);
try {
return future.get().longValue();
} catch (InterruptedException e) {

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.htrace.core.Span;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -58,9 +57,6 @@ class FSWALEntry extends Entry {
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
// The tracing span for this entry when writing WAL.
private transient Span span;
FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) {
super(key, edit);
@ -130,14 +126,4 @@ class FSWALEntry extends Entry {
Set<byte[]> getFamilyNames() {
return familyNames;
}
void attachSpan(Span span) {
this.span = span;
}
Span detachSpan() {
Span span = this.span;
this.span = null;
return span;
}
}

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.htrace.core.Span;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -43,10 +42,9 @@ final class RingBufferTruck {
private FSWALEntry entry;
/**
* Load the truck with a {@link FSWALEntry} and associated {@link Span}.
* Load the truck with a {@link FSWALEntry}.
*/
void load(FSWALEntry entry, Span span) {
entry.attachSpan(span);
void load(FSWALEntry entry) {
this.entry = entry;
this.type = Type.APPEND;
}

View File

@ -21,20 +21,21 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null;
public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
super(eventLoop, channelClass);
public SecureAsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
super(eventLoopGroup, channelClass);
}
@Override

View File

@ -34,7 +34,6 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@ -69,7 +68,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next(), channelClass);
eventLoopGroup, channelClass);
}
@Override
@ -90,23 +89,23 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
* public because of AsyncFSWAL. Should be package-private
*/
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass)
boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class)
.newInstance(eventLoop, channelClass);
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
.newInstance(eventLoopGroup, channelClass);
writer.init(fs, path, conf, overwritable);
return writer;
} catch (Exception e) {
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
LOG.error("The RegionServer async write ahead log provider " +
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
"component failures, but the current FileSystem does not support doing so. Please " +
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
"it points to a FileSystem mount that has suitable capabilities for output streams.");
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
"component failures, but the current FileSystem does not support doing so. Please " +
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
"it points to a FileSystem mount that has suitable capabilities for output streams.");
} else {
LOG.debug("Error instantiating log writer.", e);
}

View File

@ -63,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next(), CHANNEL_CLASS);
suffix, GROUP, CHANNEL_CLASS);
}
@Override
@ -72,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next(), CHANNEL_CLASS) {
suffix, GROUP, CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {

View File

@ -62,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS);
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS);
}
}