diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 0ef0cf7cec3..bd3766d99b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static com.google.common.base.Preconditions.*; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import java.io.IOException; @@ -150,8 +151,7 @@ public abstract class AbstractFSWAL implements WAL { protected final Configuration conf; /** Listeners that are called on WAL events. */ - protected final List listeners = - new CopyOnWriteArrayList(); + protected final List listeners = new CopyOnWriteArrayList(); /** * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence @@ -231,7 +231,8 @@ public abstract class AbstractFSWAL implements WAL { private static final class WalProps { /** - * Map the encoded region name to the highest sequence id. Contain all the regions it has entries of + * Map the encoded region name to the highest sequence id. Contain all the regions it has + * entries of */ public final Map encodedName2HighestSequenceId; @@ -275,17 +276,14 @@ public abstract class AbstractFSWAL implements WAL { * @return timestamp, as in the log file name. */ protected long getFileNumFromFileName(Path fileName) { - if (fileName == null) { - throw new IllegalArgumentException("file name can't be null"); - } + checkNotNull(fileName, "file name can't be null"); if (!ourFiles.accept(fileName)) { - throw new IllegalArgumentException("The log file " + fileName - + " doesn't belong to this WAL. (" + toString() + ")"); + throw new IllegalArgumentException( + "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); } final String fileNameString = fileName.toString(); - String chompedPath = - fileNameString.substring(prefixPathStr.length(), - (fileNameString.length() - walFileSuffix.length())); + String chompedPath = fileNameString.substring(prefixPathStr.length(), + (fileNameString.length() - walFileSuffix.length())); return Long.parseLong(chompedPath); } @@ -294,6 +292,27 @@ public abstract class AbstractFSWAL implements WAL { return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); } + // must be power of 2 + protected final int getPreallocatedEventCount() { + // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will + // be stuck and make no progress if the buffer is filled with appends only and there is no + // sync. If no sync, then the handlers will be outstanding just waiting on sync completion + // before they return. + int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", + 1024 * 16); + checkArgument(preallocatedEventCount >= 0, + "hbase.regionserver.wal.disruptor.event.count must > 0"); + int floor = Integer.highestOneBit(preallocatedEventCount); + if (floor == preallocatedEventCount) { + return floor; + } + // max capacity is 1 << 30 + if (floor >= 1 << 29) { + return 1 << 30; + } + return floor << 1; + } + protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List listeners, final boolean failIfWALExists, final String prefix, final String suffix) @@ -314,8 +333,8 @@ public abstract class AbstractFSWAL implements WAL { } // If prefix is null||empty then just name it wal - this.walFilePrefix = - prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); + this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal" + : URLEncoder.encode(prefix, "UTF8"); // we only correctly differentiate suffices when numeric ones start with '.' if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER @@ -338,8 +357,8 @@ public abstract class AbstractFSWAL implements WAL { } if (walFileSuffix.isEmpty()) { // in the case of the null suffix, we need to ensure the filename ends with a timestamp. - return org.apache.commons.lang.StringUtils.isNumeric(fileNameString - .substring(prefixPathStr.length())); + return org.apache.commons.lang.StringUtils + .isNumeric(fileNameString.substring(prefixPathStr.length())); } else if (!fileNameString.endsWith(walFileSuffix)) { return false; } @@ -364,22 +383,19 @@ public abstract class AbstractFSWAL implements WAL { // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks // (it costs a little x'ing bocks) - final long blocksize = - this.conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(this.fs, this.walDir)); - this.logrollsize = - (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); + final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", + FSUtils.getDefaultBlockSize(this.fs, this.walDir)); + this.logrollsize = (long) (blocksize + * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - float memstoreRatio = - conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat( - HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); + float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat( + HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if (maxLogsDefined) { LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); } - this.maxLogs = - conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", + Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" @@ -646,7 +662,7 @@ public abstract class AbstractFSWAL implements WAL { TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); try { long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); - int oldNumEntries = this.numEntries.get(); + int oldNumEntries = this.numEntries.getAndSet(0); final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); if (oldPath != null) { this.walFile2Props.put(oldPath, @@ -811,20 +827,19 @@ public abstract class AbstractFSWAL implements WAL { } /** - * updates the sequence number of a specific store. - * depending on the flag: replaces current seq number if the given seq id is bigger, - * or even if it is lower than existing one - * @param encodedRegionName + * updates the sequence number of a specific store. depending on the flag: replaces current seq + * number if the given seq id is bigger, or even if it is lower than existing one + * @param encodedRegionName * @param familyName * @param sequenceid * @param onlyIfGreater */ - @Override public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, + @Override + public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, boolean onlyIfGreater) { - sequenceIdAccounting.updateStore(encodedRegionName,familyName,sequenceid,onlyIfGreater); + sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); } - protected SyncFuture getSyncFuture(final long sequence, Span span) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { @@ -918,9 +933,8 @@ public abstract class AbstractFSWAL implements WAL { protected void postSync(final long timeInNanos, final int handlerSyncs) { if (timeInNanos > this.slowSyncNs) { - String msg = - new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) - .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); + String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) + .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); Trace.addTimelineAnnotation(msg); LOG.info(msg); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 279a6aeb418..d842f1b16b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,27 +17,35 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.Sequencer; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.concurrent.SingleThreadEventExecutor; import java.io.IOException; import java.io.InterruptedIOException; +import java.lang.reflect.Field; import java.util.ArrayDeque; import java.util.Comparator; import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,82 +64,79 @@ import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.htrace.NullScope; +import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; /** * An asynchronous implementation of FSWAL. *

- * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. We do not use RingBuffer here - * because RingBuffer need an exclusive thread to consume the entries in it, and here we want to run - * the append and sync operation inside EventLoop. We can not use EventLoop as the RingBuffer's - * executor otherwise the EventLoop can not process any other events such as socket read and write. + * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. *

* For append, we process it as follow: *

    *
  1. In the caller thread(typically, in the rpc handler thread): *
      - *
    1. Lock 'waitingConsumePayloads', bump nextTxid, and insert the entry to - * 'waitingConsumePayloads'.
    2. + *
    3. Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.
    4. *
    5. Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. *
    6. *
    *
  2. *
  3. In the consumer task(in the EventLoop thread) *
      - *
    1. Poll the entry from 'waitingConsumePayloads' and insert it into 'waitingAppendEntries'
    2. - *
    3. Poll the entry from 'waitingAppendEntries', append it to the AsyncWriter, and insert it into - * 'unackedEntries'
    4. + *
    5. Poll the entry from {@link #waitingConsumePayloads} and insert it into + * {@link #toWriteAppends}
    6. + *
    7. Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into + * {@link #unackedAppends}
    8. *
    9. If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call * sync on the AsyncWriter.
    10. - *
    11. In the callback methods(CompletionHandler): + *
    12. In the callback methods: *
        - *
      • If succeeded, poll the entry from 'unackedEntries' and drop it.
      • - *
      • If failed, add all the entries in 'unackedEntries' back to 'waitingAppendEntries' and wait - * for writing them again.
      • + *
      • If succeeded, poll the entry from {@link #unackedAppends} and drop it.
      • + *
      • If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and + * wait for writing them again.
      • *
      *
    13. *
    *
  4. *
- * For sync, the processing stages are almost same except that if it is not assigned with a new - * 'txid', we just assign the previous 'txid' to it without bumping the 'nextTxid'. And different - * from FSHLog, we will open a new writer and rewrite unacked entries to the new writer and sync - * again if we hit a sync error. + * For sync, the processing stages are almost same. And different from FSHLog, we will open a new + * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. *

* Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with * FSHLog.
* For a normal roll request(for example, we have reached the log roll size): *

    - *
  1. In the log roller thread, we add a roll payload to 'waitingConsumePayloads', and then wait on - * the rollPromise(see {@link #waitForSafePoint()}).
  2. - *
  3. In the consumer thread, we will stop polling entries from 'waitingConsumePayloads' if we hit - * a Payload which contains a roll request.
  4. - *
  5. Append all entries to current writer, issue a sync request if possible.
  6. - *
  7. If sync succeeded, check if we could finish a roll request. There 3 conditions: - *
      - *
    • 'rollPromise' is not null which means we have a pending roll request.
    • - *
    • 'waitingAppendEntries' is empty.
    • - *
    • 'unackedEntries' is empty.
    • - *
    - *
  8. + *
  9. In the log roller thread, we will set {@link #waitingRoll} to true and + * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see + * {@link #waitForSafePoint()}).
  10. + *
  11. In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if + * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} + * out.
  12. + *
  13. If there are unflush data in the writer, sync them.
  14. + *
  15. When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, + * signal the {@link #readyForRollingCond}.
  16. *
  17. Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., * we reach a safe point. So it is safe to replace old writer with new writer now.
  18. - *
  19. Acquire 'waitingConsumePayloads' lock, set 'writerBroken' and 'waitingRoll' to false, cancel - * log roller exit checker if any(see the comments in the 'failed' method of the sync - * CompletionHandler to see why we need a checker here).
  20. - *
  21. Schedule the consumer task if needed.
  22. + *
  23. Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log roller exit checker + * if any(see the comments in the {@link #syncFailed(Throwable)} method to see why we need a checker + * here).
  24. + *
  25. Schedule the consumer task.
  26. *
  27. Schedule a background task to close the old writer.
  28. *
* For a broken writer roll request, the only difference is that we can bypass the wait for safe - * point stage. See the comments in the 'failed' method of the sync CompletionHandler for more - * details. + * point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class AsyncFSWAL extends AbstractFSWAL { private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class); + private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { + int c = Long.compare(o1.getTxid(), o2.getTxid()); + return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); + }; + public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; @@ -142,62 +147,30 @@ public class AsyncFSWAL extends AbstractFSWAL { "hbase.wal.async.logroller.exited.check.interval.ms"; public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000; - /** - * Carry things that we want to pass to the consume task in event loop. Only one field can be - * non-null. - *

- * TODO: need to unify this and {@link RingBufferTruck}. There are mostly the same thing. - */ - private static final class Payload { - - // a wal entry which need to be appended - public final FSWALEntry entry; - - // indicate that we need to sync our wal writer. - public final SyncFuture sync; - - // incidate that we want to roll the writer. - public final Promise roll; - - public Payload(FSWALEntry entry) { - this.entry = entry; - this.sync = null; - this.roll = null; - } - - public Payload(SyncFuture sync) { - this.entry = null; - this.sync = sync; - this.roll = null; - } - - public Payload(Promise roll) { - this.entry = null; - this.sync = null; - this.roll = roll; - } - - @Override - public String toString() { - return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; - } - } - private final EventLoop eventLoop; - private final Deque waitingConsumePayloads; + private final Lock consumeLock = new ReentrantLock(); - // like the ringbuffer sequence. Every FSWALEntry and SyncFuture will be assigned a txid and - // then added to waitingConsumePayloads. - private long nextTxid = 1L; + private final Runnable consumer = this::consume; - private boolean consumerScheduled; + // check if there is already a consumer task in the event loop's task queue + private final Supplier hasConsumerTask; // new writer is created and we are waiting for old writer to be closed. - private boolean waitingRoll; + private volatile boolean waitingRoll; + + private boolean readyForRolling; + + private final Condition readyForRollingCond = consumeLock.newCondition(); + + private final RingBuffer waitingConsumePayloads; + + private final Sequence waitingConsumePayloadsGatingSequence; + + private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); // writer is broken and rollWriter is needed. - private boolean writerBroken; + private volatile boolean writerBroken; private final long batchSize; @@ -210,17 +183,15 @@ public class AsyncFSWAL extends AbstractFSWAL { private volatile AsyncFSOutput fsOut; - private final Deque waitingAppendEntries = new ArrayDeque(); + private final Deque toWriteAppends = new ArrayDeque<>(); - private final Deque unackedEntries = new ArrayDeque(); + private final Deque unackedAppends = new ArrayDeque<>(); private final PriorityQueue syncFutures = new PriorityQueue(11, SEQ_COMPARATOR); - private Promise rollPromise; - // the highest txid of WAL entries being processed - private long highestProcessedTxid; + private long highestProcessedAppendTxid; // file length when we issue last sync request on the writer private long fileLengthAtLastSync; @@ -237,6 +208,49 @@ public class AsyncFSWAL extends AbstractFSWAL { this.future = future; } + // See the comments in syncFailed why we need to do this. + private void cleanup() { + unackedAppends.clear(); + toWriteAppends.forEach(entry -> { + try { + entry.stampRegionSequenceId(); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + }); + toWriteAppends.clear(); + IOException error = new IOException("sync failed but log roller exited"); + for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { + sync.done(sync.getTxid(), error); + syncFutures.remove(); + } + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + for (long cursorBound = + waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case APPEND: + try { + truck.unloadAppend().stampRegionSequenceId(); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + break; + case SYNC: + SyncFuture sync = truck.unloadSync(); + sync.done(sync.getTxid(), error); + break; + default: + LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); + break; + } + waitingConsumePayloadsGatingSequence.set(nextCursor); + } + } + @Override public void run() { if (!logRollerExited) { @@ -250,27 +264,7 @@ public class AsyncFSWAL extends AbstractFSWAL { return; } } - unackedEntries.clear(); - waitingAppendEntries.clear(); - IOException error = new IOException("sync failed but log roller exited"); - for (SyncFuture future; (future = syncFutures.peek()) != null;) { - future.done(highestProcessedTxid, error); - syncFutures.remove(); - } - synchronized (waitingConsumePayloads) { - for (Payload p : waitingConsumePayloads) { - if (p.entry != null) { - try { - p.entry.stampRegionSequenceId(); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - } else if (p.sync != null) { - p.sync.done(nextTxid, error); - } - } - waitingConsumePayloads.clear(); - } + cleanup(); } public synchronized void cancel() { @@ -287,8 +281,34 @@ public class AsyncFSWAL extends AbstractFSWAL { throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoop = eventLoop; - int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); - waitingConsumePayloads = new ArrayDeque(maxHandlersCount * 3); + Supplier hasConsumerTask; + if (eventLoop instanceof SingleThreadEventExecutor) { + + try { + Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); + field.setAccessible(true); + Queue queue = (Queue) field.get(eventLoop); + hasConsumerTask = () -> queue.peek() == consumer; + } catch (Exception e) { + LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up", + e); + hasConsumerTask = () -> false; + } + } else { + hasConsumerTask = () -> false; + } + this.hasConsumerTask = hasConsumerTask; + int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + waitingConsumePayloads = + RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); + waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); + + // inrease the ringbuffer sequence so our txid is start from 1 + waitingConsumePayloads.publish(waitingConsumePayloads.next()); + waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); + batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); @@ -297,13 +317,27 @@ public class AsyncFSWAL extends AbstractFSWAL { rollWriter(); } - private void tryFinishRoll() { - // 1. a roll is requested - // 2. we have written out all entries before the roll point. - // 3. all entries have been acked. - if (rollPromise != null && waitingAppendEntries.isEmpty() && unackedEntries.isEmpty()) { - rollPromise.trySuccess(null); - rollPromise = null; + // return whether we have successfully set readyForRolling to true. + private boolean trySetReadyForRolling() { + // Check without holding lock first. Usually we will just return here. + // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to + // check them outside the consumeLock. + if (!waitingRoll || !unackedAppends.isEmpty()) { + return false; + } + consumeLock.lock(); + try { + // 1. a roll is requested + // 2. all out-going entries have been acked(we have confirmed above). + if (waitingRoll) { + readyForRolling = true; + readyForRollingCond.signalAll(); + return true; + } else { + return false; + } + } finally { + consumeLock.unlock(); } } @@ -318,7 +352,8 @@ public class AsyncFSWAL extends AbstractFSWAL { // safe to use writerBroken as a guard. // Do not forget to revisit this if we change the implementation of // FanOutOneBlockAsyncDFSOutput! - synchronized (waitingConsumePayloads) { + consumeLock.lock(); + try { if (writerBroken) { return; } @@ -331,31 +366,25 @@ public class AsyncFSWAL extends AbstractFSWAL { logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker, logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS)); writerBroken = true; + if (waitingRoll) { + readyForRolling = true; + readyForRollingCond.signalAll(); + } + } finally { + consumeLock.unlock(); } - for (Iterator iter = unackedEntries.descendingIterator(); iter.hasNext();) { - waitingAppendEntries.addFirst(iter.next()); + for (Iterator iter = unackedAppends.descendingIterator(); iter.hasNext();) { + toWriteAppends.addFirst(iter.next()); } highestUnsyncedTxid = highestSyncedTxid.get(); - if (rollPromise != null) { - rollPromise.trySuccess(null); - rollPromise = null; - return; - } // request a roll. - if (!rollWriterLock.tryLock()) { - return; - } - try { - requestLogRoll(); - } finally { - rollWriterLock.unlock(); - } + requestLogRoll(); } private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { highestSyncedTxid.set(processedTxid); int syncCount = finishSync(true); - for (Iterator iter = unackedEntries.iterator(); iter.hasNext();) { + for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { if (iter.next().getTxid() <= processedTxid) { iter.remove(); } else { @@ -363,14 +392,27 @@ public class AsyncFSWAL extends AbstractFSWAL { } } postSync(System.nanoTime() - startTimeNs, syncCount); - tryFinishRoll(); + // Ideally, we should set a flag to indicate that the log roll has already been requested for + // the current writer and give up here, and reset the flag when roll is finished. But we + // finish roll in the log roller thread so the flag need to be set by different thread which + // typically means we need to use a lock to protect it and do fencing. As the log roller will + // aggregate the roll requests of the same WAL, so it is safe to call requestLogRoll multiple + // times before the roll actual happens. But we need to stop if we set readyForRolling to true + // and wake up the log roller thread waiting in waitForSafePoint as the rollWriter call may + // return firstly and then we run the code below and request a roll on the new writer. + if (trySetReadyForRolling()) { + // we have just finished a roll, then do not need to check for log rolling, the writer will be + // closed soon. + return; + } + if (writer.getLength() < logrollsize) { + return; + } if (!rollWriterLock.tryLock()) { return; } try { - if (writer.getLength() >= logrollsize) { - requestLogRoll(); - } + requestLogRoll(); } finally { rollWriterLock.unlock(); } @@ -394,15 +436,16 @@ public class AsyncFSWAL extends AbstractFSWAL { future.setSpan(scope.detach()); } - private int finishSync(boolean addSyncTrace) { - long doneTxid = highestSyncedTxid.get(); + private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { int finished = 0; - for (SyncFuture future; (future = syncFutures.peek()) != null;) { - if (future.getTxid() <= doneTxid) { - future.done(doneTxid, null); + for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { + if (sync.getTxid() <= txid) { + sync.done(txid, null); syncFutures.remove(); finished++; - addTimeAnnotation(future, "writer synced"); + if (addSyncTrace) { + addTimeAnnotation(sync, "writer synced"); + } } else { break; } @@ -410,24 +453,69 @@ public class AsyncFSWAL extends AbstractFSWAL { return finished; } - private void consume() { + private int finishSync(boolean addSyncTrace) { + long doneTxid = highestSyncedTxid.get(); + if (doneTxid >= highestProcessedAppendTxid) { + if (toWriteAppends.isEmpty()) { + // all outstanding appends have been acked, just finish all syncs. + long maxSyncTxid = doneTxid; + for (SyncFuture sync : syncFutures) { + maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); + sync.done(maxSyncTxid, null); + if (addSyncTrace) { + addTimeAnnotation(sync, "writer synced"); + } + } + highestSyncedTxid.set(maxSyncTxid); + int finished = syncFutures.size(); + syncFutures.clear(); + return finished; + } else { + // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so + // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between + // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. + long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); + assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; + highestSyncedTxid.set(lowestUnprocessedAppendTxid - 1); + return finishSyncLowerThanTxid(lowestUnprocessedAppendTxid - 1, addSyncTrace); + } + } else { + return finishSyncLowerThanTxid(doneTxid, addSyncTrace); + } + } + + private void appendAndSync() { final AsyncWriter writer = this.writer; // maybe a sync request is not queued when we issue a sync, so check here to see if we could // finish some. finishSync(false); long newHighestProcessedTxid = -1L; - for (Iterator iter = waitingAppendEntries.iterator(); iter.hasNext();) { + for (Iterator iter = toWriteAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); boolean appended; - try { - appended = append(writer, entry); - } catch (IOException e) { - throw new AssertionError("should not happen", e); + Span span = entry.detachSpan(); + // the span maybe null if this is a retry after rolling. + if (span != null) { + TraceScope scope = Trace.continueSpan(span); + try { + appended = append(writer, entry); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); // append scope is complete + } + } else { + try { + appended = append(writer, entry); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } } newHighestProcessedTxid = entry.getTxid(); iter.remove(); if (appended) { - unackedEntries.addLast(entry); + unackedAppends.addLast(entry); if (writer.getLength() - fileLengthAtLastSync >= batchSize) { break; } @@ -436,110 +524,134 @@ public class AsyncFSWAL extends AbstractFSWAL { // if we have a newer transaction id, update it. // otherwise, use the previous transaction id. if (newHighestProcessedTxid > 0) { - highestProcessedTxid = newHighestProcessedTxid; + highestProcessedAppendTxid = newHighestProcessedTxid; } else { - newHighestProcessedTxid = highestProcessedTxid; + newHighestProcessedTxid = highestProcessedAppendTxid; } + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { // sync because buffer size limit. sync(writer, newHighestProcessedTxid); - } else if ((!syncFutures.isEmpty() || rollPromise != null) - && writer.getLength() > fileLengthAtLastSync) { - // first we should have at least one sync request or a roll request - // second we should have some unsynced data. - sync(writer, newHighestProcessedTxid); - } else if (writer.getLength() == fileLengthAtLastSync) { + return; + } + if (writer.getLength() == fileLengthAtLastSync) { // we haven't written anything out, just advance the highestSyncedSequence since we may only // stamped some region sequence id. highestSyncedTxid.set(newHighestProcessedTxid); finishSync(false); - tryFinishRoll(); + trySetReadyForRolling(); + return; + } + // we have some unsynced data but haven't reached the batch size yet + if (!syncFutures.isEmpty()) { + // we have at least one sync request + sync(writer, newHighestProcessedTxid); + return; + } + // usually waitingRoll is false so we check it without lock first. + if (waitingRoll) { + consumeLock.lock(); + try { + if (waitingRoll) { + // there is a roll request + sync(writer, newHighestProcessedTxid); + } + } finally { + consumeLock.unlock(); + } } } - private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { - int c = Long.compare(o1.getTxid(), o2.getTxid()); - return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); - }; - - private final Runnable consumer = new Runnable() { - - @Override - public void run() { - synchronized (waitingConsumePayloads) { - assert consumerScheduled; - if (writerBroken) { - // waiting for reschedule after rollWriter. - consumerScheduled = false; - return; - } - if (waitingRoll) { - // we may have toWriteEntries if the consume method does not write all pending entries - // out, this is usually happen if we have too many toWriteEntries that exceeded the - // batchSize limit. - if (waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; - } - } else { - for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) { - if (p.entry != null) { - waitingAppendEntries.addLast(p.entry); - } else if (p.sync != null) { - syncFutures.add(p.sync); - } else { - rollPromise = p.roll; - waitingRoll = true; - break; - } - } - } + private void consume() { + consumeLock.lock(); + try { + if (writerBroken) { + return; } - consume(); - synchronized (waitingConsumePayloads) { - if (waitingRoll) { - if (waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; - } + if (waitingRoll) { + if (writer.getLength() > fileLengthAtLastSync) { + // issue a sync + sync(writer, highestProcessedAppendTxid); } else { - if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; + if (unackedAppends.isEmpty()) { + readyForRolling = true; + readyForRollingCond.signalAll(); } } + return; } - // reschedule if we still have something to write. - eventLoop.execute(this); + } finally { + consumeLock.unlock(); } - }; + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + for (long cursorBound = + waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case APPEND: + toWriteAppends.addLast(truck.unloadAppend()); + break; + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); + break; + } + waitingConsumePayloadsGatingSequence.set(nextCursor); + } + appendAndSync(); + if (hasConsumerTask.get()) { + return; + } + if (toWriteAppends.isEmpty()) { + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + consumerScheduled.set(false); + // recheck here since in append and sync we do not hold the consumeLock. Thing may + // happen like + // 1. we check cursor, no new entry + // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and + // give up scheduling the consumer task. + // 3. we set consumerScheduled to false and also give up scheduling consumer task. + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + return; + } else { + // maybe someone has grabbed this before us + if (!consumerScheduled.compareAndSet(false, true)) { + return; + } + } + } + } + // reschedule if we still have something to write. + eventLoop.execute(consumer); + } private boolean shouldScheduleConsumer() { if (writerBroken || waitingRoll) { return false; } - if (consumerScheduled) { - return false; - } - consumerScheduled = true; - return true; + return consumerScheduled.compareAndSet(false, true); } @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - boolean scheduleTask; - long txid; - synchronized (waitingConsumePayloads) { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - txid = nextTxid++; - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.add(new Payload(entry)); + if (closed) { + throw new IOException("Cannot append; log is closed"); } - if (scheduleTask) { + TraceScope scope = Trace.startSpan("AsyncFSWAL.append"); + long txid = waitingConsumePayloads.next(); + try { + RingBufferTruck truck = waitingConsumePayloads.get(txid); + truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore), scope.detach()); + } finally { + waitingConsumePayloads.publish(txid); + } + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } return txid; @@ -549,14 +661,16 @@ public class AsyncFSWAL extends AbstractFSWAL { public void sync() throws IOException { TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); try { + long txid = waitingConsumePayloads.next(); SyncFuture future; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - future = getSyncFuture(nextTxid - 1, scope.detach()); - waitingConsumePayloads.addLast(new Payload(future)); + try { + future = getSyncFuture(txid, scope.detach()); + RingBufferTruck truck = waitingConsumePayloads.get(txid); + truck.load(future); + } finally { + waitingConsumePayloads.publish(txid); } - if (scheduleTask) { + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } scope = Trace.continueSpan(blockOnSync(future)); @@ -573,13 +687,17 @@ public class AsyncFSWAL extends AbstractFSWAL { } TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); try { - SyncFuture future = getSyncFuture(txid, scope.detach()); - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.addLast(new Payload(future)); + // here we do not use ring buffer sequence as txid + long sequence = waitingConsumePayloads.next(); + SyncFuture future; + try { + future = getSyncFuture(txid, scope.detach()); + RingBufferTruck truck = waitingConsumePayloads.get(sequence); + truck.load(future); + } finally { + waitingConsumePayloads.publish(sequence); } - if (scheduleTask) { + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } scope = Trace.continueSpan(blockOnSync(future)); @@ -630,27 +748,21 @@ public class AsyncFSWAL extends AbstractFSWAL { } private void waitForSafePoint() { - Future roll; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - if (!writerBroken && this.writer != null) { - Promise promise = eventLoop.newPromise(); - if (consumerScheduled) { - scheduleTask = false; - } else { - scheduleTask = consumerScheduled = true; - } - waitingConsumePayloads.addLast(new Payload(promise)); - roll = promise; - } else { - roll = eventLoop.newSucceededFuture(null); - scheduleTask = false; + consumeLock.lock(); + try { + if (writerBroken || this.writer == null) { + return; } - } - if (scheduleTask) { + consumerScheduled.set(true); + waitingRoll = true; + readyForRolling = false; eventLoop.execute(consumer); + while (!readyForRolling) { + readyForRollingCond.awaitUninterruptibly(); + } + } finally { + consumeLock.unlock(); } - roll.awaitUninterruptibly(); } @Override @@ -663,25 +775,17 @@ public class AsyncFSWAL extends AbstractFSWAL { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } this.fileLengthAtLastSync = 0L; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { + consumeLock.lock(); + try { + consumerScheduled.set(true); writerBroken = waitingRoll = false; if (logRollerExitedChecker != null) { logRollerExitedChecker.cancel(); logRollerExitedChecker = null; } - if (consumerScheduled) { - scheduleTask = false; - } else { - if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { - scheduleTask = false; - } else { - scheduleTask = consumerScheduled = true; - } - } - } - if (scheduleTask) { eventLoop.execute(consumer); + } finally { + consumeLock.unlock(); } long oldFileLen; if (oldWriter != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 94dcd4b5052..426e3b194cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -24,8 +33,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -59,15 +66,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - /** * The default implementation of FSWAL. */ @@ -119,11 +117,6 @@ public class FSHLog extends AbstractFSWAL { */ private final Disruptor disruptor; - /** - * An executorservice that runs the disruptor AppendEventHandler append executor. - */ - private final ExecutorService appendExecutor; - /** * This fellow is run by the above appendExecutor service but it is all about batching up appends * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against @@ -164,9 +157,10 @@ public class FSHLog extends AbstractFSWAL { * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs * using our logger instead of java native logger. */ - static class RingBufferExceptionHandler implements ExceptionHandler { + static class RingBufferExceptionHandler implements ExceptionHandler { + @Override - public void handleEventException(Throwable ex, long sequence, Object event) { + public void handleEventException(Throwable ex, long sequence, RingBufferTruck event) { LOG.error("Sequence=" + sequence + ", event=" + event, ex); throw new RuntimeException(ex); } @@ -230,26 +224,18 @@ public class FSHLog extends AbstractFSWAL { // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); - this.appendExecutor = Executors - .newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); - // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will - // be stuck and make no progress if the buffer is filled with appends only and there is no - // sync. If no sync, then the handlers will be outstanding just waiting on sync completion - // before they return. - final int preallocatedEventCount = this.conf - .getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. - this.disruptor = new Disruptor(RingBufferTruck.EVENT_FACTORY, - preallocatedEventCount, this.appendExecutor, ProducerType.MULTI, - new BlockingWaitStrategy()); + this.disruptor = new Disruptor(RingBufferTruck::new, + getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"), + ProducerType.MULTI, new BlockingWaitStrategy()); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); this.ringBufferEventHandler = new RingBufferEventHandler( conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); - this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); + this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); @@ -436,10 +422,6 @@ public class FSHLog extends AbstractFSWAL { this.disruptor.shutdown(); } } - // With disruptor down, this is safe to let go. - if (this.appendExecutor != null) { - this.appendExecutor.shutdown(); - } if (LOG.isDebugEnabled()) { LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir)); @@ -473,7 +455,7 @@ public class FSHLog extends AbstractFSWAL { // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); - truck.loadPayload(entry, scope.detach()); + truck.load(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); } @@ -754,7 +736,7 @@ public class FSHLog extends AbstractFSWAL { SyncFuture syncFuture = getSyncFuture(sequence, span); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - truck.loadPayload(syncFuture); + truck.load(syncFuture); } finally { this.disruptor.getRingBuffer().publish(sequence); } @@ -1021,16 +1003,17 @@ public class FSHLog extends AbstractFSWAL { // the log roller may be waiting on a signal from us here and will just hang without it. try { - if (truck.hasSyncFuturePayload()) { - this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); + if (truck.type() == RingBufferTruck.Type.SYNC) { + this.syncFutures[this.syncFuturesCount++] = truck.unloadSync(); // Force flush of syncs if we are carrying a full complement of syncFutures. if (this.syncFuturesCount == this.syncFutures.length) { endOfBatch = true; } - } else if (truck.hasFSWALEntryPayload()) { - TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); + } else if (truck.type() == RingBufferTruck.Type.APPEND) { + FSWALEntry entry = truck.unloadAppend(); + TraceScope scope = Trace.continueSpan(entry.detachSpan()); try { - FSWALEntry entry = truck.unloadFSWALEntryPayload(); + if (this.exception != null) { // We got an exception on an earlier attempt at append. Do not let this append // go through. Fail it but stamp the sequenceid into this append though failed. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index c4546f5bdc9..f599b46ff15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.htrace.Span; /** * A WAL Entry for {@link AbstractFSWAL} implementation. Immutable. @@ -55,6 +56,9 @@ class FSWALEntry extends Entry { // calling stampRegionSequenceId again. private transient boolean stamped = false; + // The tracing span for this entry when writing WAL. + private transient Span span; + FSWALEntry(final long txid, final WALKey key, final WALEdit edit, final HRegionInfo hri, final boolean inMemstore) { super(key, edit); @@ -142,4 +146,14 @@ class FSWALEntry extends Entry { Set getFamilyNames() { return familyNames; } + + void attachSpan(Span span) { + this.span = span; + } + + Span detachSpan() { + Span span = this.span; + this.span = null; + return span; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java index 25c2111e2e4..1cff70d175f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -21,96 +21,68 @@ package org.apache.hadoop.hbase.regionserver.wal; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.htrace.Span; -import com.lmax.disruptor.EventFactory; - /** - * A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL. - * Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to - * represent a 'sync' invocation. Truck instances are reused by the disruptor when it gets - * around to it so their payload references must be discarded on consumption to release them - * to GC. + * A 'truck' to carry a payload across the ring buffer from Handler to WAL. Has EITHER a + * {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to represent a 'sync' + * invocation. Truck instances are reused by the disruptor when it gets around to it so their + * payload references must be discarded on consumption to release them to GC. */ @InterfaceAudience.Private -class RingBufferTruck { +final class RingBufferTruck { + + public enum Type { + APPEND, SYNC, EMPTY + } + + private Type type = Type.EMPTY; + /** * Either this syncFuture is set or entry is set, but not both. */ - private SyncFuture syncFuture; + private SyncFuture sync; private FSWALEntry entry; - /** - * The tracing span for this entry. Can be null. - * TODO: Fix up tracing. - */ - private Span span; - /** * Load the truck with a {@link FSWALEntry} and associated {@link Span}. */ - void loadPayload(final FSWALEntry entry, final Span span) { + void load(FSWALEntry entry, Span span) { + entry.attachSpan(span); this.entry = entry; - this.span = span; - this.syncFuture = null; + this.type = Type.APPEND; } /** * Load the truck with a {@link SyncFuture}. */ - void loadPayload(final SyncFuture syncFuture) { - this.syncFuture = syncFuture; + void load(final SyncFuture syncFuture) { + this.sync = syncFuture; + this.type = Type.SYNC; + } + + /** + * @return the type of this truck's payload. + */ + Type type() { + return type; + } + + /** + * Unload the truck of its {@link FSWALEntry} payload. The internal reference is released. + */ + FSWALEntry unloadAppend() { + FSWALEntry entry = this.entry; this.entry = null; - this.span = null; + this.type = Type.EMPTY; + return entry; } /** - * return {@code true} when this truck is carrying a {@link FSWALEntry}, - * {@code false} otherwise. + * Unload the truck of its {@link SyncFuture} payload. The internal reference is released. */ - boolean hasFSWALEntryPayload() { - return this.entry != null; + SyncFuture unloadSync() { + SyncFuture sync = this.sync; + this.sync = null; + this.type = Type.EMPTY; + return sync; } - - /** - * return {@code true} when this truck is carrying a {@link SyncFuture}, - * {@code false} otherwise. - */ - boolean hasSyncFuturePayload() { - return this.syncFuture != null; - } - - /** - * Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released. - */ - FSWALEntry unloadFSWALEntryPayload() { - FSWALEntry ret = this.entry; - this.entry = null; - return ret; - } - - /** - * Unload the truck of its {@link SyncFuture} payload. The internal refernce is released. - */ - SyncFuture unloadSyncFuturePayload() { - SyncFuture ret = this.syncFuture; - this.syncFuture = null; - return ret; - } - - /** - * Unload the truck of its {@link Span} payload. The internal reference is released. - */ - Span unloadSpanPayload() { - Span ret = this.span; - this.span = null; - return ret; - } - - /** - * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. - */ - final static EventFactory EVENT_FACTORY = new EventFactory() { - public RingBufferTruck newInstance() { - return new RingBufferTruck(); - } - }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index c38c26282bc..13ab85e8429 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.wal; import com.google.common.annotations.VisibleForTesting; + import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f9d962c2de5..ca2ec8517f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -374,12 +374,12 @@ public abstract class AbstractTestWALReplay { Path f = new Path(basedir, "hfile"); HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), Bytes.toBytes("z"), 10); - List > hfs= new ArrayList>(1); + List> hfs = new ArrayList>(1); hfs.add(Pair.newPair(family, f.toString())); region.bulkLoadHFiles(hfs, true, null); // Add an edit so something in the WAL - byte [] row = tableName.getName(); + byte[] row = tableName.getName(); region.put((new Put(row)).addColumn(family, family, family)); wal.sync(); final int rowsInsertedCount = 11; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 721ee856e8b..a55df68270e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -42,7 +43,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { @BeforeClass public static void setUpBeforeClass() throws Exception { - GROUP = new NioEventLoopGroup(); + GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); AbstractTestFSWAL.setUpBeforeClass(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index 7f0c03521a6..e008b379877 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; @@ -41,7 +42,7 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - GROUP = new NioEventLoopGroup(); + GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); AbstractTestWALReplay.setUpBeforeClass(); diff --git a/pom.xml b/pom.xml index 89149250734..a633c56f7d0 100644 --- a/pom.xml +++ b/pom.xml @@ -1204,7 +1204,7 @@ 1.2 2.2 3.1 - 3.3.0 + 3.3.6 3.2.2 4.5.2