HBASE-19024 Provide a configurable option to hsync WAL edits to the disk for better durability (Harshal Jain)
This commit is contained in:
parent
764798d996
commit
f976b3a8af
|
@ -221,6 +221,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
|
||||
public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
|
||||
|
||||
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
|
||||
public static final boolean DEFAULT_WAL_HSYNC = false;
|
||||
|
||||
/**
|
||||
* Longest time we'll wait on a sequenceid.
|
||||
* Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
|
||||
|
@ -786,9 +789,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
this.rowProcessorTimeout = conf.getLong(
|
||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||
this.durability = htd.getDurability() == Durability.USE_DEFAULT
|
||||
? DEFAULT_DURABILITY
|
||||
: htd.getDurability();
|
||||
|
||||
boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC);
|
||||
/**
|
||||
* This is the global default value for durability. All tables/mutations not defining a
|
||||
* durability or using USE_DEFAULT will default to this value.
|
||||
*/
|
||||
Durability defaultDurability = forceSync ? Durability.FSYNC_WAL : Durability.SYNC_WAL;
|
||||
this.durability =
|
||||
htd.getDurability() == Durability.USE_DEFAULT ? defaultDurability : htd.getDurability();
|
||||
|
||||
if (rsServices != null) {
|
||||
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
||||
// don't initialize coprocessors if not running within a regionserver
|
||||
|
@ -8758,9 +8768,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// nothing do to
|
||||
break;
|
||||
case SYNC_WAL:
|
||||
this.wal.sync(txid, false);
|
||||
break;
|
||||
case FSYNC_WAL:
|
||||
// sync the WAL edit (SYNC and FSYNC treated the same for now)
|
||||
this.wal.sync(txid);
|
||||
this.wal.sync(txid, true);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unknown durability " + durability);
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -279,6 +280,8 @@ public class FSHLog implements WAL {
|
|||
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
||||
private final int minTolerableReplication;
|
||||
|
||||
private final boolean useHsync;
|
||||
|
||||
private final int slowSyncNs;
|
||||
|
||||
private final long walSyncTimeout;
|
||||
|
@ -534,6 +537,8 @@ public class FSHLog implements WAL {
|
|||
", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
|
||||
this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
|
||||
|
||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||
|
||||
// rollWriter sets this.hdfs_out if it can.
|
||||
rollWriter();
|
||||
|
||||
|
@ -673,7 +678,7 @@ public class FSHLog implements WAL {
|
|||
private void preemptiveSync(final ProtobufLogWriter nextWriter) {
|
||||
long startTimeNanos = System.nanoTime();
|
||||
try {
|
||||
nextWriter.sync();
|
||||
nextWriter.sync(useHsync);
|
||||
postSync(System.nanoTime() - startTimeNanos, 0);
|
||||
} catch (IOException e) {
|
||||
// optimization failed, no need to abort here.
|
||||
|
@ -1280,7 +1285,7 @@ public class FSHLog implements WAL {
|
|||
Throwable lastException = null;
|
||||
try {
|
||||
Trace.addTimelineAnnotation("syncing writer");
|
||||
writer.sync();
|
||||
writer.sync(takeSyncFuture.isForceSync());
|
||||
Trace.addTimelineAnnotation("writer synced");
|
||||
currentSequence = updateHighestSyncedSequence(currentSequence);
|
||||
} catch (IOException e) {
|
||||
|
@ -1383,20 +1388,20 @@ public class FSHLog implements WAL {
|
|||
}
|
||||
|
||||
private SyncFuture publishSyncOnRingBuffer(long sequence) {
|
||||
return publishSyncOnRingBuffer(sequence, null);
|
||||
return publishSyncOnRingBuffer(sequence, null, false);
|
||||
}
|
||||
|
||||
private long getSequenceOnRingBuffer() {
|
||||
return this.disruptor.getRingBuffer().next();
|
||||
}
|
||||
|
||||
private SyncFuture publishSyncOnRingBuffer(Span span) {
|
||||
private SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
|
||||
long sequence = this.disruptor.getRingBuffer().next();
|
||||
return publishSyncOnRingBuffer(sequence, span);
|
||||
return publishSyncOnRingBuffer(sequence, span, forceSync);
|
||||
}
|
||||
|
||||
private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
|
||||
SyncFuture syncFuture = getSyncFuture(sequence, span);
|
||||
private SyncFuture publishSyncOnRingBuffer(long sequence, Span span, boolean forceSync) {
|
||||
SyncFuture syncFuture = getSyncFuture(sequence, span).setForceSync(forceSync);
|
||||
try {
|
||||
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
||||
truck.loadPayload(syncFuture);
|
||||
|
@ -1407,8 +1412,8 @@ public class FSHLog implements WAL {
|
|||
}
|
||||
|
||||
// Sync all known transactions
|
||||
private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
|
||||
return blockOnSync(publishSyncOnRingBuffer(span));
|
||||
private Span publishSyncThenBlockOnCompletion(Span span, boolean forceSync) throws IOException {
|
||||
return blockOnSync(publishSyncOnRingBuffer(span, forceSync));
|
||||
}
|
||||
|
||||
private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
|
||||
|
@ -1503,9 +1508,14 @@ public class FSHLog implements WAL {
|
|||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
sync(useHsync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
TraceScope scope = Trace.startSpan("FSHLog.sync");
|
||||
try {
|
||||
scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
|
||||
scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), forceSync));
|
||||
} finally {
|
||||
assert scope == NullScope.INSTANCE || !scope.isDetached();
|
||||
scope.close();
|
||||
|
@ -1514,13 +1524,18 @@ public class FSHLog implements WAL {
|
|||
|
||||
@Override
|
||||
public void sync(long txid) throws IOException {
|
||||
if (this.highestSyncedSequence.get() >= txid){
|
||||
sync(txid, useHsync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid, boolean forceSync) throws IOException {
|
||||
if (this.highestSyncedSequence.get() >= txid) {
|
||||
// Already sync'd.
|
||||
return;
|
||||
}
|
||||
TraceScope scope = Trace.startSpan("FSHLog.sync");
|
||||
try {
|
||||
scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
|
||||
scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), forceSync));
|
||||
} finally {
|
||||
assert scope == NullScope.INSTANCE || !scope.isDetached();
|
||||
scope.close();
|
||||
|
|
|
@ -164,11 +164,15 @@ public class ProtobufLogWriter extends WriterBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
FSDataOutputStream fsdos = this.output;
|
||||
if (fsdos == null) return; // Presume closed
|
||||
fsdos.flush();
|
||||
fsdos.hflush();
|
||||
if (forceSync) {
|
||||
fsdos.hsync();
|
||||
} else {
|
||||
fsdos.hflush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -80,6 +80,8 @@ class SyncFuture {
|
|||
*/
|
||||
private Span span;
|
||||
|
||||
private boolean forceSync;
|
||||
|
||||
/**
|
||||
* Call this method to clear old usage and get it ready for new deploy. Call
|
||||
* this method even if it is being used for the first time.
|
||||
|
@ -120,6 +122,15 @@ class SyncFuture {
|
|||
return this.ringBufferSequence;
|
||||
}
|
||||
|
||||
synchronized boolean isForceSync() {
|
||||
return forceSync;
|
||||
}
|
||||
|
||||
synchronized SyncFuture setForceSync(boolean forceSync) {
|
||||
this.forceSync = forceSync;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the {@code span} instance from this Future. EventHandler calls
|
||||
* this method to continue the span. Thread waiting on this Future musn't call
|
||||
|
|
|
@ -198,6 +198,16 @@ class DisabledWALProvider implements WALProvider {
|
|||
sync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
sync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid, boolean forceSync) throws IOException {
|
||||
sync(txid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
|
||||
if (closed.get()) return null;
|
||||
|
|
|
@ -136,6 +136,21 @@ public interface WAL extends Closeable {
|
|||
*/
|
||||
void sync(long txid) throws IOException;
|
||||
|
||||
/**
|
||||
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
|
||||
* vs hsync.
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync(boolean forceSync) throws IOException;
|
||||
|
||||
/**
|
||||
* @param txid
|
||||
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
|
||||
* vs hsync.
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync(long txid, boolean forceSync) throws IOException;
|
||||
|
||||
/**
|
||||
* WAL keeps track of the sequence numbers that are as yet not flushed im memstores
|
||||
* in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
|
||||
|
|
|
@ -80,7 +80,12 @@ public interface WALProvider {
|
|||
// Writers are used internally. Users outside of the WAL should be relying on the
|
||||
// interface provided by WAL.
|
||||
interface Writer extends Closeable {
|
||||
void sync() throws IOException;
|
||||
/**
|
||||
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop
|
||||
* hflush vs hsync.
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync(boolean forceSync) throws IOException;
|
||||
void append(WAL.Entry entry) throws IOException;
|
||||
long getLength() throws IOException;
|
||||
}
|
||||
|
|
|
@ -312,10 +312,20 @@ public class TestRollbackFromClient {
|
|||
|
||||
@Override
|
||||
public void sync(long txid) throws IOException {
|
||||
sync(txid, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
delegation.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid, boolean forceSync) throws IOException {
|
||||
if (SHOULD_FAIL.get()) {
|
||||
throw new IOException("[TESTING] we need the failure!!!");
|
||||
}
|
||||
delegation.sync(txid);
|
||||
delegation.sync(txid, forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -347,6 +357,5 @@ public class TestRollbackFromClient {
|
|||
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
|
||||
return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,13 +133,13 @@ public class TestFailedAppendAndSync {
|
|||
w.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
if (throwSyncException) {
|
||||
throw new IOException("FAKE! Failed to replace a bad datanode...");
|
||||
}
|
||||
w.sync();
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
if (throwSyncException) {
|
||||
throw new IOException("FAKE! Failed to replace a bad datanode...");
|
||||
}
|
||||
w.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
|
|
|
@ -325,9 +325,9 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid) throws IOException {
|
||||
public void sync(long txid, boolean forceSync) throws IOException {
|
||||
storeFlushCtx.prepare();
|
||||
super.sync(txid);
|
||||
super.sync(txid, forceSync);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1170,8 +1170,8 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
w.sync();
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
w.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -172,11 +172,11 @@ public class TestWALLockup {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
if (throwException) {
|
||||
throw new IOException("FAKE! Failed to replace a bad datanode...SYNC");
|
||||
}
|
||||
w.sync();
|
||||
w.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -327,12 +327,11 @@ public class TestWALLockup {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
if (throwException) {
|
||||
throw new IOException(
|
||||
"FAKE! Failed to replace a bad datanode...SYNC");
|
||||
throw new IOException("FAKE! Failed to replace a bad datanode...SYNC");
|
||||
}
|
||||
w.sync();
|
||||
w.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -193,7 +193,7 @@ public class SequenceFileLogWriter extends WriterBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
try {
|
||||
this.writer.syncFs();
|
||||
} catch (NullPointerException npe) {
|
||||
|
@ -219,4 +219,5 @@ public class SequenceFileLogWriter extends WriterBase {
|
|||
public FSDataOutputStream getWriterFSDataOutputStream() {
|
||||
return this.writer_out;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,9 +56,9 @@ public class TestLogRollingNoCluster {
|
|||
/** ProtobufLogWriter that simulates higher latencies in sync() call */
|
||||
public static class HighLatencySyncWriter extends ProtobufLogWriter {
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
Threads.sleep(ThreadLocalRandom.current().nextInt(10));
|
||||
super.sync();
|
||||
super.sync(forceSync);
|
||||
Threads.sleep(ThreadLocalRandom.current().nextInt(10));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ public class TestProtobufLog {
|
|||
}
|
||||
writer.append(new WAL.Entry(key, edit));
|
||||
}
|
||||
writer.sync();
|
||||
writer.sync(false);
|
||||
if (withTrailer) writer.close();
|
||||
|
||||
// Now read the log using standard means.
|
||||
|
|
|
@ -120,7 +120,7 @@ public class TestReadOldRootAndMetaEdits {
|
|||
writer.append(oldMetaEntry);
|
||||
|
||||
// sync/close the writer
|
||||
writer.sync();
|
||||
writer.sync(false);
|
||||
writer.close();
|
||||
|
||||
// read the log and see things are okay.
|
||||
|
|
|
@ -1253,7 +1253,7 @@ public class TestWALReplay {
|
|||
for (FSWALEntry entry : entries) {
|
||||
writer.append(entry);
|
||||
}
|
||||
writer.sync();
|
||||
writer.sync(false);
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ public class TestReplicationSource {
|
|||
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
|
||||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
writer.append(new WAL.Entry(key, edit));
|
||||
writer.sync();
|
||||
writer.sync(false);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
|
|
|
@ -57,11 +57,11 @@ public class FaultyFSLog extends FSHLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid) throws IOException {
|
||||
public void sync(long txid, boolean forceSync) throws IOException {
|
||||
if (this.ft == FailureType.SYNC) {
|
||||
throw new IOException("sync");
|
||||
}
|
||||
super.sync(txid);
|
||||
super.sync(txid, forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -232,9 +232,9 @@ public class IOTestProvider implements WALProvider {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
if (doSyncs) {
|
||||
super.sync();
|
||||
super.sync(forceSync);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -677,7 +677,7 @@ public class TestWALFactory {
|
|||
}
|
||||
sflw.append(new WAL.Entry(key, edit));
|
||||
}
|
||||
sflw.sync();
|
||||
sflw.sync(false);
|
||||
sflw.close();
|
||||
|
||||
// Now read the log using standard means.
|
||||
|
|
|
@ -1353,7 +1353,7 @@ public class TestWALSplit {
|
|||
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
|
||||
EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
|
||||
w.append(new Entry(key, edit));
|
||||
w.sync();
|
||||
w.sync(false);
|
||||
}
|
||||
|
||||
private static void appendRegionEvent(Writer w, String region) throws IOException {
|
||||
|
@ -1371,7 +1371,7 @@ public class TestWALSplit {
|
|||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
w.append(
|
||||
new Entry(walKey, new WALEdit().add(kv)));
|
||||
w.sync();
|
||||
w.sync(false);
|
||||
}
|
||||
|
||||
public static long appendEntry(Writer writer, TableName table, byte[] region,
|
||||
|
@ -1381,7 +1381,7 @@ public class TestWALSplit {
|
|||
LOG.info(Thread.currentThread().getName() + " append");
|
||||
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||
LOG.info(Thread.currentThread().getName() + " sync");
|
||||
writer.sync();
|
||||
writer.sync(false);
|
||||
return seq;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue