HBASE-17021 Use RingBuffer to reduce the contention in AsyncFSWAL
This commit is contained in:
parent
0998af01df
commit
3b629d632a
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.*;
|
||||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -150,8 +151,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
|
|
||||||
/** Listeners that are called on WAL events. */
|
/** Listeners that are called on WAL events. */
|
||||||
protected final List<WALActionsListener> listeners =
|
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
|
||||||
new CopyOnWriteArrayList<WALActionsListener>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
|
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
|
||||||
|
@ -231,7 +231,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
private static final class WalProps {
|
private static final class WalProps {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map the encoded region name to the highest sequence id. Contain all the regions it has entries of
|
* Map the encoded region name to the highest sequence id. Contain all the regions it has
|
||||||
|
* entries of
|
||||||
*/
|
*/
|
||||||
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
||||||
|
|
||||||
|
@ -275,16 +276,13 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
* @return timestamp, as in the log file name.
|
* @return timestamp, as in the log file name.
|
||||||
*/
|
*/
|
||||||
protected long getFileNumFromFileName(Path fileName) {
|
protected long getFileNumFromFileName(Path fileName) {
|
||||||
if (fileName == null) {
|
checkNotNull(fileName, "file name can't be null");
|
||||||
throw new IllegalArgumentException("file name can't be null");
|
|
||||||
}
|
|
||||||
if (!ourFiles.accept(fileName)) {
|
if (!ourFiles.accept(fileName)) {
|
||||||
throw new IllegalArgumentException("The log file " + fileName
|
throw new IllegalArgumentException(
|
||||||
+ " doesn't belong to this WAL. (" + toString() + ")");
|
"The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
|
||||||
}
|
}
|
||||||
final String fileNameString = fileName.toString();
|
final String fileNameString = fileName.toString();
|
||||||
String chompedPath =
|
String chompedPath = fileNameString.substring(prefixPathStr.length(),
|
||||||
fileNameString.substring(prefixPathStr.length(),
|
|
||||||
(fileNameString.length() - walFileSuffix.length()));
|
(fileNameString.length() - walFileSuffix.length()));
|
||||||
return Long.parseLong(chompedPath);
|
return Long.parseLong(chompedPath);
|
||||||
}
|
}
|
||||||
|
@ -294,6 +292,27 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
|
return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// must be power of 2
|
||||||
|
protected final int getPreallocatedEventCount() {
|
||||||
|
// Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
|
||||||
|
// be stuck and make no progress if the buffer is filled with appends only and there is no
|
||||||
|
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
|
||||||
|
// before they return.
|
||||||
|
int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count",
|
||||||
|
1024 * 16);
|
||||||
|
checkArgument(preallocatedEventCount >= 0,
|
||||||
|
"hbase.regionserver.wal.disruptor.event.count must > 0");
|
||||||
|
int floor = Integer.highestOneBit(preallocatedEventCount);
|
||||||
|
if (floor == preallocatedEventCount) {
|
||||||
|
return floor;
|
||||||
|
}
|
||||||
|
// max capacity is 1 << 30
|
||||||
|
if (floor >= 1 << 29) {
|
||||||
|
return 1 << 30;
|
||||||
|
}
|
||||||
|
return floor << 1;
|
||||||
|
}
|
||||||
|
|
||||||
protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
|
protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
|
||||||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||||
final boolean failIfWALExists, final String prefix, final String suffix)
|
final boolean failIfWALExists, final String prefix, final String suffix)
|
||||||
|
@ -314,8 +333,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If prefix is null||empty then just name it wal
|
// If prefix is null||empty then just name it wal
|
||||||
this.walFilePrefix =
|
this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal"
|
||||||
prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
|
: URLEncoder.encode(prefix, "UTF8");
|
||||||
// we only correctly differentiate suffices when numeric ones start with '.'
|
// we only correctly differentiate suffices when numeric ones start with '.'
|
||||||
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
|
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
|
||||||
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
|
throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
|
||||||
|
@ -338,8 +357,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
}
|
}
|
||||||
if (walFileSuffix.isEmpty()) {
|
if (walFileSuffix.isEmpty()) {
|
||||||
// in the case of the null suffix, we need to ensure the filename ends with a timestamp.
|
// in the case of the null suffix, we need to ensure the filename ends with a timestamp.
|
||||||
return org.apache.commons.lang.StringUtils.isNumeric(fileNameString
|
return org.apache.commons.lang.StringUtils
|
||||||
.substring(prefixPathStr.length()));
|
.isNumeric(fileNameString.substring(prefixPathStr.length()));
|
||||||
} else if (!fileNameString.endsWith(walFileSuffix)) {
|
} else if (!fileNameString.endsWith(walFileSuffix)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -364,21 +383,18 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
|
|
||||||
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
|
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
|
||||||
// (it costs a little x'ing bocks)
|
// (it costs a little x'ing bocks)
|
||||||
final long blocksize =
|
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||||
this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
|
||||||
FSUtils.getDefaultBlockSize(this.fs, this.walDir));
|
FSUtils.getDefaultBlockSize(this.fs, this.walDir));
|
||||||
this.logrollsize =
|
this.logrollsize = (long) (blocksize
|
||||||
(long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
|
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
|
||||||
|
|
||||||
float memstoreRatio =
|
float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat(
|
||||||
conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat(
|
|
||||||
HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
|
HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
|
||||||
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
|
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
|
||||||
if (maxLogsDefined) {
|
if (maxLogsDefined) {
|
||||||
LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
|
LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
|
||||||
}
|
}
|
||||||
this.maxLogs =
|
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
|
||||||
conf.getInt("hbase.regionserver.maxlogs",
|
|
||||||
Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
|
Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
|
||||||
|
|
||||||
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
|
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
|
||||||
|
@ -646,7 +662,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
|
TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
|
||||||
try {
|
try {
|
||||||
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
|
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
|
||||||
int oldNumEntries = this.numEntries.get();
|
int oldNumEntries = this.numEntries.getAndSet(0);
|
||||||
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
|
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
|
||||||
if (oldPath != null) {
|
if (oldPath != null) {
|
||||||
this.walFile2Props.put(oldPath,
|
this.walFile2Props.put(oldPath,
|
||||||
|
@ -811,20 +827,19 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* updates the sequence number of a specific store.
|
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
||||||
* depending on the flag: replaces current seq number if the given seq id is bigger,
|
* number if the given seq id is bigger, or even if it is lower than existing one
|
||||||
* or even if it is lower than existing one
|
|
||||||
* @param encodedRegionName
|
* @param encodedRegionName
|
||||||
* @param familyName
|
* @param familyName
|
||||||
* @param sequenceid
|
* @param sequenceid
|
||||||
* @param onlyIfGreater
|
* @param onlyIfGreater
|
||||||
*/
|
*/
|
||||||
@Override public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
|
@Override
|
||||||
|
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
|
||||||
boolean onlyIfGreater) {
|
boolean onlyIfGreater) {
|
||||||
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
|
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected SyncFuture getSyncFuture(final long sequence, Span span) {
|
protected SyncFuture getSyncFuture(final long sequence, Span span) {
|
||||||
SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
|
SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
|
||||||
if (syncFuture == null) {
|
if (syncFuture == null) {
|
||||||
|
@ -918,8 +933,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
|
|
||||||
protected void postSync(final long timeInNanos, final int handlerSyncs) {
|
protected void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||||
if (timeInNanos > this.slowSyncNs) {
|
if (timeInNanos > this.slowSyncNs) {
|
||||||
String msg =
|
String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
|
||||||
new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
|
|
||||||
.append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
|
.append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
|
||||||
Trace.addTimelineAnnotation(msg);
|
Trace.addTimelineAnnotation(msg);
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
|
|
@ -17,27 +17,35 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT;
|
|
||||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
|
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
import com.lmax.disruptor.Sequence;
|
||||||
|
import com.lmax.disruptor.Sequencer;
|
||||||
|
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.util.concurrent.Future;
|
|
||||||
import io.netty.util.concurrent.Promise;
|
|
||||||
import io.netty.util.concurrent.ScheduledFuture;
|
import io.netty.util.concurrent.ScheduledFuture;
|
||||||
|
import io.netty.util.concurrent.SingleThreadEventExecutor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -56,82 +64,79 @@ import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.htrace.NullScope;
|
import org.apache.htrace.NullScope;
|
||||||
|
import org.apache.htrace.Span;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
import org.apache.htrace.TraceScope;
|
import org.apache.htrace.TraceScope;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An asynchronous implementation of FSWAL.
|
* An asynchronous implementation of FSWAL.
|
||||||
* <p>
|
* <p>
|
||||||
* Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. We do not use RingBuffer here
|
* Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
|
||||||
* because RingBuffer need an exclusive thread to consume the entries in it, and here we want to run
|
|
||||||
* the append and sync operation inside EventLoop. We can not use EventLoop as the RingBuffer's
|
|
||||||
* executor otherwise the EventLoop can not process any other events such as socket read and write.
|
|
||||||
* <p>
|
* <p>
|
||||||
* For append, we process it as follow:
|
* For append, we process it as follow:
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>In the caller thread(typically, in the rpc handler thread):
|
* <li>In the caller thread(typically, in the rpc handler thread):
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>Lock 'waitingConsumePayloads', bump nextTxid, and insert the entry to
|
* <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
|
||||||
* 'waitingConsumePayloads'.</li>
|
|
||||||
* <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
|
* <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
|
||||||
* </li>
|
* </li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* </li>
|
* </li>
|
||||||
* <li>In the consumer task(in the EventLoop thread)
|
* <li>In the consumer task(in the EventLoop thread)
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>Poll the entry from 'waitingConsumePayloads' and insert it into 'waitingAppendEntries'</li>
|
* <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
|
||||||
* <li>Poll the entry from 'waitingAppendEntries', append it to the AsyncWriter, and insert it into
|
* {@link #toWriteAppends}</li>
|
||||||
* 'unackedEntries'</li>
|
* <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
|
||||||
|
* {@link #unackedAppends}</li>
|
||||||
* <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
|
* <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
|
||||||
* sync on the AsyncWriter.</li>
|
* sync on the AsyncWriter.</li>
|
||||||
* <li>In the callback methods(CompletionHandler):
|
* <li>In the callback methods:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>If succeeded, poll the entry from 'unackedEntries' and drop it.</li>
|
* <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
|
||||||
* <li>If failed, add all the entries in 'unackedEntries' back to 'waitingAppendEntries' and wait
|
* <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
|
||||||
* for writing them again.</li>
|
* wait for writing them again.</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* </li>
|
* </li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* </li>
|
* </li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* For sync, the processing stages are almost same except that if it is not assigned with a new
|
* For sync, the processing stages are almost same. And different from FSHLog, we will open a new
|
||||||
* 'txid', we just assign the previous 'txid' to it without bumping the 'nextTxid'. And different
|
* writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
|
||||||
* from FSHLog, we will open a new writer and rewrite unacked entries to the new writer and sync
|
|
||||||
* again if we hit a sync error.
|
|
||||||
* <p>
|
* <p>
|
||||||
* Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
|
* Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
|
||||||
* FSHLog.<br>
|
* FSHLog.<br>
|
||||||
* For a normal roll request(for example, we have reached the log roll size):
|
* For a normal roll request(for example, we have reached the log roll size):
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>In the log roller thread, we add a roll payload to 'waitingConsumePayloads', and then wait on
|
* <li>In the log roller thread, we will set {@link #waitingRoll} to true and
|
||||||
* the rollPromise(see {@link #waitForSafePoint()}).</li>
|
* {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
|
||||||
* <li>In the consumer thread, we will stop polling entries from 'waitingConsumePayloads' if we hit
|
* {@link #waitForSafePoint()}).</li>
|
||||||
* a Payload which contains a roll request.</li>
|
* <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
|
||||||
* <li>Append all entries to current writer, issue a sync request if possible.</li>
|
* {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends}
|
||||||
* <li>If sync succeeded, check if we could finish a roll request. There 3 conditions:
|
* out.</li>
|
||||||
* <ul>
|
* <li>If there are unflush data in the writer, sync them.</li>
|
||||||
* <li>'rollPromise' is not null which means we have a pending roll request.</li>
|
* <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
|
||||||
* <li>'waitingAppendEntries' is empty.</li>
|
* signal the {@link #readyForRollingCond}.</li>
|
||||||
* <li>'unackedEntries' is empty.</li>
|
|
||||||
* </ul>
|
|
||||||
* </li>
|
|
||||||
* <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
|
* <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
|
||||||
* we reach a safe point. So it is safe to replace old writer with new writer now.</li>
|
* we reach a safe point. So it is safe to replace old writer with new writer now.</li>
|
||||||
* <li>Acquire 'waitingConsumePayloads' lock, set 'writerBroken' and 'waitingRoll' to false, cancel
|
* <li>Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log roller exit checker
|
||||||
* log roller exit checker if any(see the comments in the 'failed' method of the sync
|
* if any(see the comments in the {@link #syncFailed(Throwable)} method to see why we need a checker
|
||||||
* CompletionHandler to see why we need a checker here).</li>
|
* here).</li>
|
||||||
* <li>Schedule the consumer task if needed.</li>
|
* <li>Schedule the consumer task.</li>
|
||||||
* <li>Schedule a background task to close the old writer.</li>
|
* <li>Schedule a background task to close the old writer.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* For a broken writer roll request, the only difference is that we can bypass the wait for safe
|
* For a broken writer roll request, the only difference is that we can bypass the wait for safe
|
||||||
* point stage. See the comments in the 'failed' method of the sync CompletionHandler for more
|
* point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details.
|
||||||
* details.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class);
|
private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class);
|
||||||
|
|
||||||
|
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
|
||||||
|
int c = Long.compare(o1.getTxid(), o2.getTxid());
|
||||||
|
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
|
||||||
|
};
|
||||||
|
|
||||||
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
|
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
|
||||||
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
|
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
|
||||||
|
|
||||||
|
@ -142,62 +147,30 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
"hbase.wal.async.logroller.exited.check.interval.ms";
|
"hbase.wal.async.logroller.exited.check.interval.ms";
|
||||||
public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000;
|
public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
/**
|
|
||||||
* Carry things that we want to pass to the consume task in event loop. Only one field can be
|
|
||||||
* non-null.
|
|
||||||
* <p>
|
|
||||||
* TODO: need to unify this and {@link RingBufferTruck}. There are mostly the same thing.
|
|
||||||
*/
|
|
||||||
private static final class Payload {
|
|
||||||
|
|
||||||
// a wal entry which need to be appended
|
|
||||||
public final FSWALEntry entry;
|
|
||||||
|
|
||||||
// indicate that we need to sync our wal writer.
|
|
||||||
public final SyncFuture sync;
|
|
||||||
|
|
||||||
// incidate that we want to roll the writer.
|
|
||||||
public final Promise<Void> roll;
|
|
||||||
|
|
||||||
public Payload(FSWALEntry entry) {
|
|
||||||
this.entry = entry;
|
|
||||||
this.sync = null;
|
|
||||||
this.roll = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Payload(SyncFuture sync) {
|
|
||||||
this.entry = null;
|
|
||||||
this.sync = sync;
|
|
||||||
this.roll = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Payload(Promise<Void> roll) {
|
|
||||||
this.entry = null;
|
|
||||||
this.sync = null;
|
|
||||||
this.roll = roll;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final EventLoop eventLoop;
|
private final EventLoop eventLoop;
|
||||||
|
|
||||||
private final Deque<Payload> waitingConsumePayloads;
|
private final Lock consumeLock = new ReentrantLock();
|
||||||
|
|
||||||
// like the ringbuffer sequence. Every FSWALEntry and SyncFuture will be assigned a txid and
|
private final Runnable consumer = this::consume;
|
||||||
// then added to waitingConsumePayloads.
|
|
||||||
private long nextTxid = 1L;
|
|
||||||
|
|
||||||
private boolean consumerScheduled;
|
// check if there is already a consumer task in the event loop's task queue
|
||||||
|
private final Supplier<Boolean> hasConsumerTask;
|
||||||
|
|
||||||
// new writer is created and we are waiting for old writer to be closed.
|
// new writer is created and we are waiting for old writer to be closed.
|
||||||
private boolean waitingRoll;
|
private volatile boolean waitingRoll;
|
||||||
|
|
||||||
|
private boolean readyForRolling;
|
||||||
|
|
||||||
|
private final Condition readyForRollingCond = consumeLock.newCondition();
|
||||||
|
|
||||||
|
private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
|
||||||
|
|
||||||
|
private final Sequence waitingConsumePayloadsGatingSequence;
|
||||||
|
|
||||||
|
private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
|
||||||
|
|
||||||
// writer is broken and rollWriter is needed.
|
// writer is broken and rollWriter is needed.
|
||||||
private boolean writerBroken;
|
private volatile boolean writerBroken;
|
||||||
|
|
||||||
private final long batchSize;
|
private final long batchSize;
|
||||||
|
|
||||||
|
@ -210,17 +183,15 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
private volatile AsyncFSOutput fsOut;
|
private volatile AsyncFSOutput fsOut;
|
||||||
|
|
||||||
private final Deque<FSWALEntry> waitingAppendEntries = new ArrayDeque<FSWALEntry>();
|
private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
|
||||||
|
|
||||||
private final Deque<FSWALEntry> unackedEntries = new ArrayDeque<FSWALEntry>();
|
private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
|
||||||
|
|
||||||
private final PriorityQueue<SyncFuture> syncFutures =
|
private final PriorityQueue<SyncFuture> syncFutures =
|
||||||
new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
|
new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
|
||||||
|
|
||||||
private Promise<Void> rollPromise;
|
|
||||||
|
|
||||||
// the highest txid of WAL entries being processed
|
// the highest txid of WAL entries being processed
|
||||||
private long highestProcessedTxid;
|
private long highestProcessedAppendTxid;
|
||||||
|
|
||||||
// file length when we issue last sync request on the writer
|
// file length when we issue last sync request on the writer
|
||||||
private long fileLengthAtLastSync;
|
private long fileLengthAtLastSync;
|
||||||
|
@ -237,6 +208,49 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
this.future = future;
|
this.future = future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// See the comments in syncFailed why we need to do this.
|
||||||
|
private void cleanup() {
|
||||||
|
unackedAppends.clear();
|
||||||
|
toWriteAppends.forEach(entry -> {
|
||||||
|
try {
|
||||||
|
entry.stampRegionSequenceId();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("should not happen", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
toWriteAppends.clear();
|
||||||
|
IOException error = new IOException("sync failed but log roller exited");
|
||||||
|
for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
|
||||||
|
sync.done(sync.getTxid(), error);
|
||||||
|
syncFutures.remove();
|
||||||
|
}
|
||||||
|
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
|
||||||
|
for (long cursorBound =
|
||||||
|
waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
|
||||||
|
if (!waitingConsumePayloads.isPublished(nextCursor)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
|
||||||
|
switch (truck.type()) {
|
||||||
|
case APPEND:
|
||||||
|
try {
|
||||||
|
truck.unloadAppend().stampRegionSequenceId();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("should not happen", e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SYNC:
|
||||||
|
SyncFuture sync = truck.unloadSync();
|
||||||
|
sync.done(sync.getTxid(), error);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
waitingConsumePayloadsGatingSequence.set(nextCursor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (!logRollerExited) {
|
if (!logRollerExited) {
|
||||||
|
@ -250,27 +264,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
unackedEntries.clear();
|
cleanup();
|
||||||
waitingAppendEntries.clear();
|
|
||||||
IOException error = new IOException("sync failed but log roller exited");
|
|
||||||
for (SyncFuture future; (future = syncFutures.peek()) != null;) {
|
|
||||||
future.done(highestProcessedTxid, error);
|
|
||||||
syncFutures.remove();
|
|
||||||
}
|
|
||||||
synchronized (waitingConsumePayloads) {
|
|
||||||
for (Payload p : waitingConsumePayloads) {
|
|
||||||
if (p.entry != null) {
|
|
||||||
try {
|
|
||||||
p.entry.stampRegionSequenceId();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new AssertionError("should not happen", e);
|
|
||||||
}
|
|
||||||
} else if (p.sync != null) {
|
|
||||||
p.sync.done(nextTxid, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
waitingConsumePayloads.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void cancel() {
|
public synchronized void cancel() {
|
||||||
|
@ -287,8 +281,34 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
throws FailedLogCloseException, IOException {
|
throws FailedLogCloseException, IOException {
|
||||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||||
this.eventLoop = eventLoop;
|
this.eventLoop = eventLoop;
|
||||||
int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200);
|
Supplier<Boolean> hasConsumerTask;
|
||||||
waitingConsumePayloads = new ArrayDeque<Payload>(maxHandlersCount * 3);
|
if (eventLoop instanceof SingleThreadEventExecutor) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
|
||||||
|
field.setAccessible(true);
|
||||||
|
Queue<?> queue = (Queue<?>) field.get(eventLoop);
|
||||||
|
hasConsumerTask = () -> queue.peek() == consumer;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up",
|
||||||
|
e);
|
||||||
|
hasConsumerTask = () -> false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
hasConsumerTask = () -> false;
|
||||||
|
}
|
||||||
|
this.hasConsumerTask = hasConsumerTask;
|
||||||
|
int preallocatedEventCount =
|
||||||
|
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
|
||||||
|
waitingConsumePayloads =
|
||||||
|
RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
|
||||||
|
waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
|
||||||
|
waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
|
||||||
|
|
||||||
|
// inrease the ringbuffer sequence so our txid is start from 1
|
||||||
|
waitingConsumePayloads.publish(waitingConsumePayloads.next());
|
||||||
|
waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
|
||||||
|
|
||||||
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
||||||
createMaxRetries =
|
createMaxRetries =
|
||||||
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
||||||
|
@ -297,13 +317,27 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
rollWriter();
|
rollWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryFinishRoll() {
|
// return whether we have successfully set readyForRolling to true.
|
||||||
|
private boolean trySetReadyForRolling() {
|
||||||
|
// Check without holding lock first. Usually we will just return here.
|
||||||
|
// waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
|
||||||
|
// check them outside the consumeLock.
|
||||||
|
if (!waitingRoll || !unackedAppends.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
consumeLock.lock();
|
||||||
|
try {
|
||||||
// 1. a roll is requested
|
// 1. a roll is requested
|
||||||
// 2. we have written out all entries before the roll point.
|
// 2. all out-going entries have been acked(we have confirmed above).
|
||||||
// 3. all entries have been acked.
|
if (waitingRoll) {
|
||||||
if (rollPromise != null && waitingAppendEntries.isEmpty() && unackedEntries.isEmpty()) {
|
readyForRolling = true;
|
||||||
rollPromise.trySuccess(null);
|
readyForRollingCond.signalAll();
|
||||||
rollPromise = null;
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
consumeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,7 +352,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
// safe to use writerBroken as a guard.
|
// safe to use writerBroken as a guard.
|
||||||
// Do not forget to revisit this if we change the implementation of
|
// Do not forget to revisit this if we change the implementation of
|
||||||
// FanOutOneBlockAsyncDFSOutput!
|
// FanOutOneBlockAsyncDFSOutput!
|
||||||
synchronized (waitingConsumePayloads) {
|
consumeLock.lock();
|
||||||
|
try {
|
||||||
if (writerBroken) {
|
if (writerBroken) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -331,31 +366,25 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
|
logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
|
||||||
logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
|
logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
|
||||||
writerBroken = true;
|
writerBroken = true;
|
||||||
|
if (waitingRoll) {
|
||||||
|
readyForRolling = true;
|
||||||
|
readyForRollingCond.signalAll();
|
||||||
}
|
}
|
||||||
for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
|
} finally {
|
||||||
waitingAppendEntries.addFirst(iter.next());
|
consumeLock.unlock();
|
||||||
|
}
|
||||||
|
for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
|
||||||
|
toWriteAppends.addFirst(iter.next());
|
||||||
}
|
}
|
||||||
highestUnsyncedTxid = highestSyncedTxid.get();
|
highestUnsyncedTxid = highestSyncedTxid.get();
|
||||||
if (rollPromise != null) {
|
|
||||||
rollPromise.trySuccess(null);
|
|
||||||
rollPromise = null;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// request a roll.
|
// request a roll.
|
||||||
if (!rollWriterLock.tryLock()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
requestLogRoll();
|
requestLogRoll();
|
||||||
} finally {
|
|
||||||
rollWriterLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
|
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
|
||||||
highestSyncedTxid.set(processedTxid);
|
highestSyncedTxid.set(processedTxid);
|
||||||
int syncCount = finishSync(true);
|
int syncCount = finishSync(true);
|
||||||
for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
|
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
|
||||||
if (iter.next().getTxid() <= processedTxid) {
|
if (iter.next().getTxid() <= processedTxid) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
} else {
|
} else {
|
||||||
|
@ -363,14 +392,27 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
postSync(System.nanoTime() - startTimeNs, syncCount);
|
postSync(System.nanoTime() - startTimeNs, syncCount);
|
||||||
tryFinishRoll();
|
// Ideally, we should set a flag to indicate that the log roll has already been requested for
|
||||||
|
// the current writer and give up here, and reset the flag when roll is finished. But we
|
||||||
|
// finish roll in the log roller thread so the flag need to be set by different thread which
|
||||||
|
// typically means we need to use a lock to protect it and do fencing. As the log roller will
|
||||||
|
// aggregate the roll requests of the same WAL, so it is safe to call requestLogRoll multiple
|
||||||
|
// times before the roll actual happens. But we need to stop if we set readyForRolling to true
|
||||||
|
// and wake up the log roller thread waiting in waitForSafePoint as the rollWriter call may
|
||||||
|
// return firstly and then we run the code below and request a roll on the new writer.
|
||||||
|
if (trySetReadyForRolling()) {
|
||||||
|
// we have just finished a roll, then do not need to check for log rolling, the writer will be
|
||||||
|
// closed soon.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (writer.getLength() < logrollsize) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!rollWriterLock.tryLock()) {
|
if (!rollWriterLock.tryLock()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (writer.getLength() >= logrollsize) {
|
|
||||||
requestLogRoll();
|
requestLogRoll();
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
rollWriterLock.unlock();
|
rollWriterLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -394,15 +436,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
future.setSpan(scope.detach());
|
future.setSpan(scope.detach());
|
||||||
}
|
}
|
||||||
|
|
||||||
private int finishSync(boolean addSyncTrace) {
|
private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
|
||||||
long doneTxid = highestSyncedTxid.get();
|
|
||||||
int finished = 0;
|
int finished = 0;
|
||||||
for (SyncFuture future; (future = syncFutures.peek()) != null;) {
|
for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
|
||||||
if (future.getTxid() <= doneTxid) {
|
if (sync.getTxid() <= txid) {
|
||||||
future.done(doneTxid, null);
|
sync.done(txid, null);
|
||||||
syncFutures.remove();
|
syncFutures.remove();
|
||||||
finished++;
|
finished++;
|
||||||
addTimeAnnotation(future, "writer synced");
|
if (addSyncTrace) {
|
||||||
|
addTimeAnnotation(sync, "writer synced");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -410,24 +453,69 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void consume() {
|
private int finishSync(boolean addSyncTrace) {
|
||||||
|
long doneTxid = highestSyncedTxid.get();
|
||||||
|
if (doneTxid >= highestProcessedAppendTxid) {
|
||||||
|
if (toWriteAppends.isEmpty()) {
|
||||||
|
// all outstanding appends have been acked, just finish all syncs.
|
||||||
|
long maxSyncTxid = doneTxid;
|
||||||
|
for (SyncFuture sync : syncFutures) {
|
||||||
|
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
|
||||||
|
sync.done(maxSyncTxid, null);
|
||||||
|
if (addSyncTrace) {
|
||||||
|
addTimeAnnotation(sync, "writer synced");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
highestSyncedTxid.set(maxSyncTxid);
|
||||||
|
int finished = syncFutures.size();
|
||||||
|
syncFutures.clear();
|
||||||
|
return finished;
|
||||||
|
} else {
|
||||||
|
// There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
|
||||||
|
// if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
|
||||||
|
// highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
|
||||||
|
long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
|
||||||
|
assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
|
||||||
|
highestSyncedTxid.set(lowestUnprocessedAppendTxid - 1);
|
||||||
|
return finishSyncLowerThanTxid(lowestUnprocessedAppendTxid - 1, addSyncTrace);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void appendAndSync() {
|
||||||
final AsyncWriter writer = this.writer;
|
final AsyncWriter writer = this.writer;
|
||||||
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
|
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
|
||||||
// finish some.
|
// finish some.
|
||||||
finishSync(false);
|
finishSync(false);
|
||||||
long newHighestProcessedTxid = -1L;
|
long newHighestProcessedTxid = -1L;
|
||||||
for (Iterator<FSWALEntry> iter = waitingAppendEntries.iterator(); iter.hasNext();) {
|
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
|
||||||
FSWALEntry entry = iter.next();
|
FSWALEntry entry = iter.next();
|
||||||
boolean appended;
|
boolean appended;
|
||||||
|
Span span = entry.detachSpan();
|
||||||
|
// the span maybe null if this is a retry after rolling.
|
||||||
|
if (span != null) {
|
||||||
|
TraceScope scope = Trace.continueSpan(span);
|
||||||
|
try {
|
||||||
|
appended = append(writer, entry);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("should not happen", e);
|
||||||
|
} finally {
|
||||||
|
assert scope == NullScope.INSTANCE || !scope.isDetached();
|
||||||
|
scope.close(); // append scope is complete
|
||||||
|
}
|
||||||
|
} else {
|
||||||
try {
|
try {
|
||||||
appended = append(writer, entry);
|
appended = append(writer, entry);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new AssertionError("should not happen", e);
|
throw new AssertionError("should not happen", e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
newHighestProcessedTxid = entry.getTxid();
|
newHighestProcessedTxid = entry.getTxid();
|
||||||
iter.remove();
|
iter.remove();
|
||||||
if (appended) {
|
if (appended) {
|
||||||
unackedEntries.addLast(entry);
|
unackedAppends.addLast(entry);
|
||||||
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
|
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -436,110 +524,134 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
// if we have a newer transaction id, update it.
|
// if we have a newer transaction id, update it.
|
||||||
// otherwise, use the previous transaction id.
|
// otherwise, use the previous transaction id.
|
||||||
if (newHighestProcessedTxid > 0) {
|
if (newHighestProcessedTxid > 0) {
|
||||||
highestProcessedTxid = newHighestProcessedTxid;
|
highestProcessedAppendTxid = newHighestProcessedTxid;
|
||||||
} else {
|
} else {
|
||||||
newHighestProcessedTxid = highestProcessedTxid;
|
newHighestProcessedTxid = highestProcessedAppendTxid;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
|
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
|
||||||
// sync because buffer size limit.
|
// sync because buffer size limit.
|
||||||
sync(writer, newHighestProcessedTxid);
|
sync(writer, newHighestProcessedTxid);
|
||||||
} else if ((!syncFutures.isEmpty() || rollPromise != null)
|
return;
|
||||||
&& writer.getLength() > fileLengthAtLastSync) {
|
}
|
||||||
// first we should have at least one sync request or a roll request
|
if (writer.getLength() == fileLengthAtLastSync) {
|
||||||
// second we should have some unsynced data.
|
|
||||||
sync(writer, newHighestProcessedTxid);
|
|
||||||
} else if (writer.getLength() == fileLengthAtLastSync) {
|
|
||||||
// we haven't written anything out, just advance the highestSyncedSequence since we may only
|
// we haven't written anything out, just advance the highestSyncedSequence since we may only
|
||||||
// stamped some region sequence id.
|
// stamped some region sequence id.
|
||||||
highestSyncedTxid.set(newHighestProcessedTxid);
|
highestSyncedTxid.set(newHighestProcessedTxid);
|
||||||
finishSync(false);
|
finishSync(false);
|
||||||
tryFinishRoll();
|
trySetReadyForRolling();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// we have some unsynced data but haven't reached the batch size yet
|
||||||
|
if (!syncFutures.isEmpty()) {
|
||||||
|
// we have at least one sync request
|
||||||
|
sync(writer, newHighestProcessedTxid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// usually waitingRoll is false so we check it without lock first.
|
||||||
|
if (waitingRoll) {
|
||||||
|
consumeLock.lock();
|
||||||
|
try {
|
||||||
|
if (waitingRoll) {
|
||||||
|
// there is a roll request
|
||||||
|
sync(writer, newHighestProcessedTxid);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
consumeLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
|
private void consume() {
|
||||||
int c = Long.compare(o1.getTxid(), o2.getTxid());
|
consumeLock.lock();
|
||||||
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
|
try {
|
||||||
};
|
|
||||||
|
|
||||||
private final Runnable consumer = new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
synchronized (waitingConsumePayloads) {
|
|
||||||
assert consumerScheduled;
|
|
||||||
if (writerBroken) {
|
if (writerBroken) {
|
||||||
// waiting for reschedule after rollWriter.
|
|
||||||
consumerScheduled = false;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (waitingRoll) {
|
if (waitingRoll) {
|
||||||
// we may have toWriteEntries if the consume method does not write all pending entries
|
if (writer.getLength() > fileLengthAtLastSync) {
|
||||||
// out, this is usually happen if we have too many toWriteEntries that exceeded the
|
// issue a sync
|
||||||
// batchSize limit.
|
sync(writer, highestProcessedAppendTxid);
|
||||||
if (waitingAppendEntries.isEmpty()) {
|
} else {
|
||||||
consumerScheduled = false;
|
if (unackedAppends.isEmpty()) {
|
||||||
|
readyForRolling = true;
|
||||||
|
readyForRollingCond.signalAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} finally {
|
||||||
for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) {
|
consumeLock.unlock();
|
||||||
if (p.entry != null) {
|
}
|
||||||
waitingAppendEntries.addLast(p.entry);
|
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
|
||||||
} else if (p.sync != null) {
|
for (long cursorBound =
|
||||||
syncFutures.add(p.sync);
|
waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
|
||||||
} else {
|
if (!waitingConsumePayloads.isPublished(nextCursor)) {
|
||||||
rollPromise = p.roll;
|
|
||||||
waitingRoll = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
|
||||||
|
switch (truck.type()) {
|
||||||
|
case APPEND:
|
||||||
|
toWriteAppends.addLast(truck.unloadAppend());
|
||||||
|
break;
|
||||||
|
case SYNC:
|
||||||
|
syncFutures.add(truck.unloadSync());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
waitingConsumePayloadsGatingSequence.set(nextCursor);
|
||||||
}
|
}
|
||||||
}
|
appendAndSync();
|
||||||
consume();
|
if (hasConsumerTask.get()) {
|
||||||
synchronized (waitingConsumePayloads) {
|
|
||||||
if (waitingRoll) {
|
|
||||||
if (waitingAppendEntries.isEmpty()) {
|
|
||||||
consumerScheduled = false;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (toWriteAppends.isEmpty()) {
|
||||||
|
if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
|
||||||
|
consumerScheduled.set(false);
|
||||||
|
// recheck here since in append and sync we do not hold the consumeLock. Thing may
|
||||||
|
// happen like
|
||||||
|
// 1. we check cursor, no new entry
|
||||||
|
// 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
|
||||||
|
// give up scheduling the consumer task.
|
||||||
|
// 3. we set consumerScheduled to false and also give up scheduling consumer task.
|
||||||
|
if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
|
||||||
|
return;
|
||||||
} else {
|
} else {
|
||||||
if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) {
|
// maybe someone has grabbed this before us
|
||||||
consumerScheduled = false;
|
if (!consumerScheduled.compareAndSet(false, true)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// reschedule if we still have something to write.
|
// reschedule if we still have something to write.
|
||||||
eventLoop.execute(this);
|
eventLoop.execute(consumer);
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
private boolean shouldScheduleConsumer() {
|
private boolean shouldScheduleConsumer() {
|
||||||
if (writerBroken || waitingRoll) {
|
if (writerBroken || waitingRoll) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (consumerScheduled) {
|
return consumerScheduled.compareAndSet(false, true);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
consumerScheduled = true;
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore)
|
public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean scheduleTask;
|
if (closed) {
|
||||||
long txid;
|
|
||||||
synchronized (waitingConsumePayloads) {
|
|
||||||
if (this.closed) {
|
|
||||||
throw new IOException("Cannot append; log is closed");
|
throw new IOException("Cannot append; log is closed");
|
||||||
}
|
}
|
||||||
txid = nextTxid++;
|
TraceScope scope = Trace.startSpan("AsyncFSWAL.append");
|
||||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
|
long txid = waitingConsumePayloads.next();
|
||||||
scheduleTask = shouldScheduleConsumer();
|
try {
|
||||||
waitingConsumePayloads.add(new Payload(entry));
|
RingBufferTruck truck = waitingConsumePayloads.get(txid);
|
||||||
|
truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore), scope.detach());
|
||||||
|
} finally {
|
||||||
|
waitingConsumePayloads.publish(txid);
|
||||||
}
|
}
|
||||||
if (scheduleTask) {
|
if (shouldScheduleConsumer()) {
|
||||||
eventLoop.execute(consumer);
|
eventLoop.execute(consumer);
|
||||||
}
|
}
|
||||||
return txid;
|
return txid;
|
||||||
|
@ -549,14 +661,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
TraceScope scope = Trace.startSpan("AsyncFSWAL.sync");
|
TraceScope scope = Trace.startSpan("AsyncFSWAL.sync");
|
||||||
try {
|
try {
|
||||||
|
long txid = waitingConsumePayloads.next();
|
||||||
SyncFuture future;
|
SyncFuture future;
|
||||||
boolean scheduleTask;
|
try {
|
||||||
synchronized (waitingConsumePayloads) {
|
future = getSyncFuture(txid, scope.detach());
|
||||||
scheduleTask = shouldScheduleConsumer();
|
RingBufferTruck truck = waitingConsumePayloads.get(txid);
|
||||||
future = getSyncFuture(nextTxid - 1, scope.detach());
|
truck.load(future);
|
||||||
waitingConsumePayloads.addLast(new Payload(future));
|
} finally {
|
||||||
|
waitingConsumePayloads.publish(txid);
|
||||||
}
|
}
|
||||||
if (scheduleTask) {
|
if (shouldScheduleConsumer()) {
|
||||||
eventLoop.execute(consumer);
|
eventLoop.execute(consumer);
|
||||||
}
|
}
|
||||||
scope = Trace.continueSpan(blockOnSync(future));
|
scope = Trace.continueSpan(blockOnSync(future));
|
||||||
|
@ -573,13 +687,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
TraceScope scope = Trace.startSpan("AsyncFSWAL.sync");
|
TraceScope scope = Trace.startSpan("AsyncFSWAL.sync");
|
||||||
try {
|
try {
|
||||||
SyncFuture future = getSyncFuture(txid, scope.detach());
|
// here we do not use ring buffer sequence as txid
|
||||||
boolean scheduleTask;
|
long sequence = waitingConsumePayloads.next();
|
||||||
synchronized (waitingConsumePayloads) {
|
SyncFuture future;
|
||||||
scheduleTask = shouldScheduleConsumer();
|
try {
|
||||||
waitingConsumePayloads.addLast(new Payload(future));
|
future = getSyncFuture(txid, scope.detach());
|
||||||
|
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
|
||||||
|
truck.load(future);
|
||||||
|
} finally {
|
||||||
|
waitingConsumePayloads.publish(sequence);
|
||||||
}
|
}
|
||||||
if (scheduleTask) {
|
if (shouldScheduleConsumer()) {
|
||||||
eventLoop.execute(consumer);
|
eventLoop.execute(consumer);
|
||||||
}
|
}
|
||||||
scope = Trace.continueSpan(blockOnSync(future));
|
scope = Trace.continueSpan(blockOnSync(future));
|
||||||
|
@ -630,27 +748,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForSafePoint() {
|
private void waitForSafePoint() {
|
||||||
Future<Void> roll;
|
consumeLock.lock();
|
||||||
boolean scheduleTask;
|
try {
|
||||||
synchronized (waitingConsumePayloads) {
|
if (writerBroken || this.writer == null) {
|
||||||
if (!writerBroken && this.writer != null) {
|
return;
|
||||||
Promise<Void> promise = eventLoop.newPromise();
|
|
||||||
if (consumerScheduled) {
|
|
||||||
scheduleTask = false;
|
|
||||||
} else {
|
|
||||||
scheduleTask = consumerScheduled = true;
|
|
||||||
}
|
}
|
||||||
waitingConsumePayloads.addLast(new Payload(promise));
|
consumerScheduled.set(true);
|
||||||
roll = promise;
|
waitingRoll = true;
|
||||||
} else {
|
readyForRolling = false;
|
||||||
roll = eventLoop.newSucceededFuture(null);
|
|
||||||
scheduleTask = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (scheduleTask) {
|
|
||||||
eventLoop.execute(consumer);
|
eventLoop.execute(consumer);
|
||||||
|
while (!readyForRolling) {
|
||||||
|
readyForRollingCond.awaitUninterruptibly();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
consumeLock.unlock();
|
||||||
}
|
}
|
||||||
roll.awaitUninterruptibly();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -663,25 +775,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
||||||
}
|
}
|
||||||
this.fileLengthAtLastSync = 0L;
|
this.fileLengthAtLastSync = 0L;
|
||||||
boolean scheduleTask;
|
consumeLock.lock();
|
||||||
synchronized (waitingConsumePayloads) {
|
try {
|
||||||
|
consumerScheduled.set(true);
|
||||||
writerBroken = waitingRoll = false;
|
writerBroken = waitingRoll = false;
|
||||||
if (logRollerExitedChecker != null) {
|
if (logRollerExitedChecker != null) {
|
||||||
logRollerExitedChecker.cancel();
|
logRollerExitedChecker.cancel();
|
||||||
logRollerExitedChecker = null;
|
logRollerExitedChecker = null;
|
||||||
}
|
}
|
||||||
if (consumerScheduled) {
|
|
||||||
scheduleTask = false;
|
|
||||||
} else {
|
|
||||||
if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) {
|
|
||||||
scheduleTask = false;
|
|
||||||
} else {
|
|
||||||
scheduleTask = consumerScheduled = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (scheduleTask) {
|
|
||||||
eventLoop.execute(consumer);
|
eventLoop.execute(consumer);
|
||||||
|
} finally {
|
||||||
|
consumeLock.unlock();
|
||||||
}
|
}
|
||||||
long oldFileLen;
|
long oldFileLen;
|
||||||
if (oldWriter != null) {
|
if (oldWriter != null) {
|
||||||
|
|
|
@ -17,6 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
import com.lmax.disruptor.ExceptionHandler;
|
||||||
|
import com.lmax.disruptor.LifecycleAware;
|
||||||
|
import com.lmax.disruptor.TimeoutException;
|
||||||
|
import com.lmax.disruptor.dsl.Disruptor;
|
||||||
|
import com.lmax.disruptor.dsl.ProducerType;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -24,8 +33,6 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -59,15 +66,6 @@ import org.apache.htrace.Span;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
import org.apache.htrace.TraceScope;
|
import org.apache.htrace.TraceScope;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
|
||||||
import com.lmax.disruptor.EventHandler;
|
|
||||||
import com.lmax.disruptor.ExceptionHandler;
|
|
||||||
import com.lmax.disruptor.LifecycleAware;
|
|
||||||
import com.lmax.disruptor.TimeoutException;
|
|
||||||
import com.lmax.disruptor.dsl.Disruptor;
|
|
||||||
import com.lmax.disruptor.dsl.ProducerType;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default implementation of FSWAL.
|
* The default implementation of FSWAL.
|
||||||
*/
|
*/
|
||||||
|
@ -119,11 +117,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
*/
|
*/
|
||||||
private final Disruptor<RingBufferTruck> disruptor;
|
private final Disruptor<RingBufferTruck> disruptor;
|
||||||
|
|
||||||
/**
|
|
||||||
* An executorservice that runs the disruptor AppendEventHandler append executor.
|
|
||||||
*/
|
|
||||||
private final ExecutorService appendExecutor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This fellow is run by the above appendExecutor service but it is all about batching up appends
|
* This fellow is run by the above appendExecutor service but it is all about batching up appends
|
||||||
* and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against
|
* and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against
|
||||||
|
@ -164,9 +157,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
|
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
|
||||||
* using our logger instead of java native logger.
|
* using our logger instead of java native logger.
|
||||||
*/
|
*/
|
||||||
static class RingBufferExceptionHandler implements ExceptionHandler {
|
static class RingBufferExceptionHandler implements ExceptionHandler<RingBufferTruck> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleEventException(Throwable ex, long sequence, Object event) {
|
public void handleEventException(Throwable ex, long sequence, RingBufferTruck event) {
|
||||||
LOG.error("Sequence=" + sequence + ", event=" + event, ex);
|
LOG.error("Sequence=" + sequence + ", event=" + event, ex);
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
|
@ -230,26 +224,18 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
||||||
// put on the ring buffer.
|
// put on the ring buffer.
|
||||||
String hostingThreadName = Thread.currentThread().getName();
|
String hostingThreadName = Thread.currentThread().getName();
|
||||||
this.appendExecutor = Executors
|
|
||||||
.newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
|
|
||||||
// Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
|
|
||||||
// be stuck and make no progress if the buffer is filled with appends only and there is no
|
|
||||||
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
|
|
||||||
// before they return.
|
|
||||||
final int preallocatedEventCount = this.conf
|
|
||||||
.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
|
|
||||||
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
|
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
|
||||||
// spinning as other strategies do.
|
// spinning as other strategies do.
|
||||||
this.disruptor = new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY,
|
this.disruptor = new Disruptor<RingBufferTruck>(RingBufferTruck::new,
|
||||||
preallocatedEventCount, this.appendExecutor, ProducerType.MULTI,
|
getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"),
|
||||||
new BlockingWaitStrategy());
|
ProducerType.MULTI, new BlockingWaitStrategy());
|
||||||
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
||||||
// because SyncFuture.NOT_DONE = 0.
|
// because SyncFuture.NOT_DONE = 0.
|
||||||
this.disruptor.getRingBuffer().next();
|
this.disruptor.getRingBuffer().next();
|
||||||
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
|
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
|
||||||
this.ringBufferEventHandler = new RingBufferEventHandler(
|
this.ringBufferEventHandler = new RingBufferEventHandler(
|
||||||
conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount);
|
conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount);
|
||||||
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
|
this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler());
|
||||||
this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler });
|
this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler });
|
||||||
// Starting up threads in constructor is a no no; Interface should have an init call.
|
// Starting up threads in constructor is a no no; Interface should have an init call.
|
||||||
this.disruptor.start();
|
this.disruptor.start();
|
||||||
|
@ -436,10 +422,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
this.disruptor.shutdown();
|
this.disruptor.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// With disruptor down, this is safe to let go.
|
|
||||||
if (this.appendExecutor != null) {
|
|
||||||
this.appendExecutor.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir));
|
LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir));
|
||||||
|
@ -473,7 +455,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
// edit with its edit/sequence id.
|
// edit with its edit/sequence id.
|
||||||
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
|
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
|
||||||
entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
|
entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
|
||||||
truck.loadPayload(entry, scope.detach());
|
truck.load(entry, scope.detach());
|
||||||
} finally {
|
} finally {
|
||||||
this.disruptor.getRingBuffer().publish(sequence);
|
this.disruptor.getRingBuffer().publish(sequence);
|
||||||
}
|
}
|
||||||
|
@ -754,7 +736,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
SyncFuture syncFuture = getSyncFuture(sequence, span);
|
SyncFuture syncFuture = getSyncFuture(sequence, span);
|
||||||
try {
|
try {
|
||||||
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
||||||
truck.loadPayload(syncFuture);
|
truck.load(syncFuture);
|
||||||
} finally {
|
} finally {
|
||||||
this.disruptor.getRingBuffer().publish(sequence);
|
this.disruptor.getRingBuffer().publish(sequence);
|
||||||
}
|
}
|
||||||
|
@ -1021,16 +1003,17 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
// the log roller may be waiting on a signal from us here and will just hang without it.
|
// the log roller may be waiting on a signal from us here and will just hang without it.
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (truck.hasSyncFuturePayload()) {
|
if (truck.type() == RingBufferTruck.Type.SYNC) {
|
||||||
this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
|
this.syncFutures[this.syncFuturesCount++] = truck.unloadSync();
|
||||||
// Force flush of syncs if we are carrying a full complement of syncFutures.
|
// Force flush of syncs if we are carrying a full complement of syncFutures.
|
||||||
if (this.syncFuturesCount == this.syncFutures.length) {
|
if (this.syncFuturesCount == this.syncFutures.length) {
|
||||||
endOfBatch = true;
|
endOfBatch = true;
|
||||||
}
|
}
|
||||||
} else if (truck.hasFSWALEntryPayload()) {
|
} else if (truck.type() == RingBufferTruck.Type.APPEND) {
|
||||||
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
|
FSWALEntry entry = truck.unloadAppend();
|
||||||
|
TraceScope scope = Trace.continueSpan(entry.detachSpan());
|
||||||
try {
|
try {
|
||||||
FSWALEntry entry = truck.unloadFSWALEntryPayload();
|
|
||||||
if (this.exception != null) {
|
if (this.exception != null) {
|
||||||
// We got an exception on an earlier attempt at append. Do not let this append
|
// We got an exception on an earlier attempt at append. Do not let this append
|
||||||
// go through. Fail it but stamp the sequenceid into this append though failed.
|
// go through. Fail it but stamp the sequenceid into this append though failed.
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.apache.htrace.Span;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
|
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
|
||||||
|
@ -55,6 +56,9 @@ class FSWALEntry extends Entry {
|
||||||
// calling stampRegionSequenceId again.
|
// calling stampRegionSequenceId again.
|
||||||
private transient boolean stamped = false;
|
private transient boolean stamped = false;
|
||||||
|
|
||||||
|
// The tracing span for this entry when writing WAL.
|
||||||
|
private transient Span span;
|
||||||
|
|
||||||
FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
|
FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
|
||||||
final HRegionInfo hri, final boolean inMemstore) {
|
final HRegionInfo hri, final boolean inMemstore) {
|
||||||
super(key, edit);
|
super(key, edit);
|
||||||
|
@ -142,4 +146,14 @@ class FSWALEntry extends Entry {
|
||||||
Set<byte[]> getFamilyNames() {
|
Set<byte[]> getFamilyNames() {
|
||||||
return familyNames;
|
return familyNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void attachSpan(Span span) {
|
||||||
|
this.span = span;
|
||||||
|
}
|
||||||
|
|
||||||
|
Span detachSpan() {
|
||||||
|
Span span = this.span;
|
||||||
|
this.span = null;
|
||||||
|
return span;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,96 +21,68 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.htrace.Span;
|
import org.apache.htrace.Span;
|
||||||
|
|
||||||
import com.lmax.disruptor.EventFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL.
|
* A 'truck' to carry a payload across the ring buffer from Handler to WAL. Has EITHER a
|
||||||
* Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to
|
* {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to represent a 'sync'
|
||||||
* represent a 'sync' invocation. Truck instances are reused by the disruptor when it gets
|
* invocation. Truck instances are reused by the disruptor when it gets around to it so their
|
||||||
* around to it so their payload references must be discarded on consumption to release them
|
* payload references must be discarded on consumption to release them to GC.
|
||||||
* to GC.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class RingBufferTruck {
|
final class RingBufferTruck {
|
||||||
|
|
||||||
|
public enum Type {
|
||||||
|
APPEND, SYNC, EMPTY
|
||||||
|
}
|
||||||
|
|
||||||
|
private Type type = Type.EMPTY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Either this syncFuture is set or entry is set, but not both.
|
* Either this syncFuture is set or entry is set, but not both.
|
||||||
*/
|
*/
|
||||||
private SyncFuture syncFuture;
|
private SyncFuture sync;
|
||||||
private FSWALEntry entry;
|
private FSWALEntry entry;
|
||||||
|
|
||||||
/**
|
|
||||||
* The tracing span for this entry. Can be null.
|
|
||||||
* TODO: Fix up tracing.
|
|
||||||
*/
|
|
||||||
private Span span;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the truck with a {@link FSWALEntry} and associated {@link Span}.
|
* Load the truck with a {@link FSWALEntry} and associated {@link Span}.
|
||||||
*/
|
*/
|
||||||
void loadPayload(final FSWALEntry entry, final Span span) {
|
void load(FSWALEntry entry, Span span) {
|
||||||
|
entry.attachSpan(span);
|
||||||
this.entry = entry;
|
this.entry = entry;
|
||||||
this.span = span;
|
this.type = Type.APPEND;
|
||||||
this.syncFuture = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the truck with a {@link SyncFuture}.
|
* Load the truck with a {@link SyncFuture}.
|
||||||
*/
|
*/
|
||||||
void loadPayload(final SyncFuture syncFuture) {
|
void load(final SyncFuture syncFuture) {
|
||||||
this.syncFuture = syncFuture;
|
this.sync = syncFuture;
|
||||||
|
this.type = Type.SYNC;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the type of this truck's payload.
|
||||||
|
*/
|
||||||
|
Type type() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unload the truck of its {@link FSWALEntry} payload. The internal reference is released.
|
||||||
|
*/
|
||||||
|
FSWALEntry unloadAppend() {
|
||||||
|
FSWALEntry entry = this.entry;
|
||||||
this.entry = null;
|
this.entry = null;
|
||||||
this.span = null;
|
this.type = Type.EMPTY;
|
||||||
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return {@code true} when this truck is carrying a {@link FSWALEntry},
|
* Unload the truck of its {@link SyncFuture} payload. The internal reference is released.
|
||||||
* {@code false} otherwise.
|
|
||||||
*/
|
*/
|
||||||
boolean hasFSWALEntryPayload() {
|
SyncFuture unloadSync() {
|
||||||
return this.entry != null;
|
SyncFuture sync = this.sync;
|
||||||
|
this.sync = null;
|
||||||
|
this.type = Type.EMPTY;
|
||||||
|
return sync;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* return {@code true} when this truck is carrying a {@link SyncFuture},
|
|
||||||
* {@code false} otherwise.
|
|
||||||
*/
|
|
||||||
boolean hasSyncFuturePayload() {
|
|
||||||
return this.syncFuture != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released.
|
|
||||||
*/
|
|
||||||
FSWALEntry unloadFSWALEntryPayload() {
|
|
||||||
FSWALEntry ret = this.entry;
|
|
||||||
this.entry = null;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unload the truck of its {@link SyncFuture} payload. The internal refernce is released.
|
|
||||||
*/
|
|
||||||
SyncFuture unloadSyncFuturePayload() {
|
|
||||||
SyncFuture ret = this.syncFuture;
|
|
||||||
this.syncFuture = null;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unload the truck of its {@link Span} payload. The internal reference is released.
|
|
||||||
*/
|
|
||||||
Span unloadSpanPayload() {
|
|
||||||
Span ret = this.span;
|
|
||||||
this.span = null;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Factory for making a bunch of these. Needed by the ringbuffer/disruptor.
|
|
||||||
*/
|
|
||||||
final static EventFactory<RingBufferTruck> EVENT_FACTORY = new EventFactory<RingBufferTruck>() {
|
|
||||||
public RingBufferTruck newInstance() {
|
|
||||||
return new RingBufferTruck();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,13 +20,12 @@
|
||||||
package org.apache.hadoop.hbase.wal;
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
|
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -42,7 +43,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
GROUP = new NioEventLoopGroup();
|
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
|
||||||
AbstractTestFSWAL.setUpBeforeClass();
|
AbstractTestFSWAL.setUpBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -41,7 +42,7 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
GROUP = new NioEventLoopGroup();
|
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay"));
|
||||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||||
AbstractTestWALReplay.setUpBeforeClass();
|
AbstractTestWALReplay.setUpBeforeClass();
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -1204,7 +1204,7 @@
|
||||||
<commons-logging.version>1.2</commons-logging.version>
|
<commons-logging.version>1.2</commons-logging.version>
|
||||||
<commons-math.version>2.2</commons-math.version>
|
<commons-math.version>2.2</commons-math.version>
|
||||||
<commons-net.version>3.1</commons-net.version>
|
<commons-net.version>3.1</commons-net.version>
|
||||||
<disruptor.version>3.3.0</disruptor.version>
|
<disruptor.version>3.3.6</disruptor.version>
|
||||||
<!-- Do not use versions earlier than 3.2.2 due to a security vulnerability -->
|
<!-- Do not use versions earlier than 3.2.2 due to a security vulnerability -->
|
||||||
<collections.version>3.2.2</collections.version>
|
<collections.version>3.2.2</collections.version>
|
||||||
<httpclient.version>4.5.2</httpclient.version>
|
<httpclient.version>4.5.2</httpclient.version>
|
||||||
|
|
Loading…
Reference in New Issue