From 26b1695df5e5fe2a6c3bf786abe6c86423d494dd Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 3 Jan 2020 16:57:49 +0800 Subject: [PATCH] HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974) Signed-off-by: Guanghao Zhang Signed-off-by: stack --- .../hbase/regionserver/wal/AsyncFSWAL.java | 20 ++++- .../hadoop/hbase/regionserver/wal/FSHLog.java | 2 +- .../wal/TestAsyncFSWALDurability.java | 43 ++++++++++ .../wal/TestFSHLogDurability.java | 41 +++++++++ .../wal/WALDurabilityTestBase.java | 83 ++++++++++++------- 5 files changed, 156 insertions(+), 33 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index b8d2a0b8c5c..61f57ae87a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -347,13 +347,31 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + // find all the sync futures between these two txids to see if we need to issue a hsync, if no + // sync futures then just use the default one. + private boolean isHsync(long beginTxid, long endTxid) { + SortedSet futures = + syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); + if (futures.isEmpty()) { + return useHsync; + } + for (SyncFuture future : futures) { + if (future.isForceSync()) { + return true; + } + } + return false; + } + private void sync(AsyncWriter writer) { fileLengthAtLastSync = writer.getLength(); long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; + boolean shouldUseHsync = + isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; final long startTimeNs = System.nanoTime(); final long epoch = (long) epochAndState >>> 2L; - addListener(writer.sync(useHsync), (result, error) -> { + addListener(writer.sync(shouldUseHsync), (result, error) -> { if (error != null) { syncFailed(epoch, error); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 52de0a5783c..02be831f86c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -579,7 +579,7 @@ public class FSHLog extends AbstractFSWAL { Throwable lastException = null; try { TraceUtil.addTimelineAnnotation("syncing writer"); - writer.sync(useHsync); + writer.sync(takeSyncFuture.isForceSync()); TraceUtil.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java index 04996c0ff30..f9dee0729a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -72,11 +74,19 @@ public class TestAsyncFSWALDurability extends WALDurabilityTestBase channelClass) throws FailedLogCloseException, IOException { @@ -84,6 +94,34 @@ class CustomAsyncFSWAL extends AsyncFSWAL { eventLoopGroup, channelClass); } + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + AsyncWriter writer = super.createWriterInstance(path); + return new AsyncWriter() { + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public long getLength() { + return writer.getLength(); + } + + @Override + public CompletableFuture sync(boolean forceSync) { + writerSyncFlag = forceSync; + return writer.sync(forceSync); + } + + @Override + public void append(Entry entry) { + writer.append(entry); + } + }; + } + @Override public void sync(boolean forceSync) throws IOException { syncFlag = forceSync; @@ -98,9 +136,14 @@ class CustomAsyncFSWAL extends AsyncFSWAL { void resetSyncFlag() { this.syncFlag = null; + this.writerSyncFlag = null; } Boolean getSyncFlag() { return syncFlag; } + + Boolean getWriterSyncFlag() { + return writerSyncFlag; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java index e7f73d0c6d9..9c460588fdb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -51,16 +52,51 @@ public class TestFSHLogDurability extends WALDurabilityTestBase { protected Boolean getSyncFlag(CustomFSHLog wal) { return wal.getSyncFlag(); } + + @Override + protected Boolean getWriterSyncFlag(CustomFSHLog wal) { + return wal.getWriterSyncFlag(); + } } class CustomFSHLog extends FSHLog { private Boolean syncFlag; + private Boolean writerSyncFlag; + public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf) throws IOException { super(fs, root, logDir, conf); } + @Override + protected Writer createWriterInstance(Path path) throws IOException { + Writer writer = super.createWriterInstance(path); + return new Writer() { + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public long getLength() { + return writer.getLength(); + } + + @Override + public void sync(boolean forceSync) throws IOException { + writerSyncFlag = forceSync; + writer.sync(forceSync); + } + + @Override + public void append(Entry entry) throws IOException { + writer.append(entry); + } + }; + } + @Override public void sync(boolean forceSync) throws IOException { syncFlag = forceSync; @@ -75,9 +111,14 @@ class CustomFSHLog extends FSHLog { void resetSyncFlag() { this.syncFlag = null; + this.writerSyncFlag = null; } Boolean getSyncFlag() { return syncFlag; } + + Boolean getWriterSyncFlag() { + return writerSyncFlag; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java index bc0255b3ea4..f100b06904d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -77,25 +76,39 @@ public abstract class WALDurabilityTestBase { protected abstract Boolean getSyncFlag(T wal); + protected abstract Boolean getWriterSyncFlag(T wal); + @Test public void testWALDurability() throws IOException { + byte[] bytes = Bytes.toBytes(getName()); + Put put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + // global hbase.wal.hsync false, no override in put call - hflush conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false"); FileSystem fs = FileSystem.get(conf); Path rootDir = new Path(dir + getName()); T wal = getWAL(fs, rootDir, getName(), conf); HRegion region = initHRegion(tableName, null, null, wal); - byte[] bytes = Bytes.toBytes(getName()); - Put put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + try { + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertFalse(getSyncFlag(wal)); + assertFalse(getWriterSyncFlag(wal)); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertFalse(getSyncFlag(wal)); - - region.close(); - wal.close(); + // global hbase.wal.hsync false, durability set in put call - fsync + put.setDurability(Durability.FSYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); + } finally { + HBaseTestingUtility.closeRegionAndWAL(region); + } // global hbase.wal.hsync true, no override in put call conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true"); @@ -103,28 +116,36 @@ public abstract class WALDurabilityTestBase { wal = getWAL(fs, rootDir, getName(), conf); region = initHRegion(tableName, null, null, wal); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertEquals(getSyncFlag(wal), true); + try { + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); - // global hbase.wal.hsync true, durability set in put call - fsync - put.setDurability(Durability.FSYNC_WAL); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertTrue(getSyncFlag(wal)); + // global hbase.wal.hsync true, durability set in put call - fsync + put.setDurability(Durability.FSYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); - // global hbase.wal.hsync true, durability set in put call - sync - put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - put.setDurability(Durability.SYNC_WAL); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertFalse(getSyncFlag(wal)); - - HBaseTestingUtility.closeRegionAndWAL(region); + // global hbase.wal.hsync true, durability set in put call - sync + put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + put.setDurability(Durability.SYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertFalse(getSyncFlag(wal)); + assertFalse(getWriterSyncFlag(wal)); + } finally { + HBaseTestingUtility.closeRegionAndWAL(region); + } } private String getName() {