diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java index 6f510dfa75e..aaf0b5cc732 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java @@ -50,7 +50,6 @@ public enum Durability { SYNC_WAL, /** * Write the Mutation to the WAL synchronously and force the entries to disk. - * (Note: this is currently not supported and will behave identical to {@link #SYNC_WAL}) * See HADOOP-6313 */ FSYNC_WAL diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9e9a72107e3..9bab148e30e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -230,16 +230,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize"; public static final int DEFAULT_MAX_CELL_SIZE = 10485760; - public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE = - "hbase.regionserver.minibatch.size"; - public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000; - /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL; + public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE = + "hbase.regionserver.minibatch.size"; + public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000; + + public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync"; + public static final boolean DEFAULT_WAL_HSYNC = false; + final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't @@ -795,11 +798,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ this.rowProcessorTimeout = conf.getLong( "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); - this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? - DEFAULT_DURABILITY : htd.getDurability(); this.storeHotnessProtector = new StoreHotnessProtector(this, conf); + boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC); + /** + * This is the global default value for durability. All tables/mutations not defining a + * durability or using USE_DEFAULT will default to this value. + */ + Durability defaultDurability = forceSync ? Durability.FSYNC_WAL : Durability.SYNC_WAL; + this.regionDurability = + this.htableDescriptor.getDurability() == Durability.USE_DEFAULT ? defaultDurability : + this.htableDescriptor.getDurability(); + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -8361,10 +8372,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // nothing do to break; case SYNC_WAL: + this.wal.sync(txid, false); + break; case FSYNC_WAL: - // sync the WAL edit (SYNC and FSYNC treated the same for now) - this.wal.sync(txid); - break; + this.wal.sync(txid, true); + break; default: throw new RuntimeException("Unknown durability " + durability); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 6b77e809426..5b8feae327a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -129,6 +130,8 @@ public class FSHLog extends AbstractFSWAL { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; + private final boolean useHsync; + // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -212,6 +215,8 @@ public class FSHLog extends AbstractFSWAL { 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); + this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); + // rollWriter sets this.hdfs_out if it can. rollWriter(); @@ -255,7 +260,7 @@ public class FSHLog extends AbstractFSWAL { private void preemptiveSync(final ProtobufLogWriter nextWriter) { long startTimeNanos = System.nanoTime(); try { - nextWriter.sync(); + nextWriter.sync(useHsync); postSync(System.nanoTime() - startTimeNanos, 0); } catch (IOException e) { // optimization failed, no need to abort here. @@ -332,7 +337,7 @@ public class FSHLog extends AbstractFSWAL { // sequence and zigzagLatch will be set together assert sequence > 0L : "Failed to get sequence from ring buffer"; TraceUtil.addTimelineAnnotation("awaiting safepoint"); - syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence)); + syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false)); } } catch (FailedSyncBeforeLogCloseException e) { // If unflushed/unsynced entries on close, it is reason to abort. @@ -577,7 +582,7 @@ public class FSHLog extends AbstractFSWAL { Throwable lastException = null; try { TraceUtil.addTimelineAnnotation("syncing writer"); - writer.sync(); + writer.sync(useHsync); TraceUtil.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { @@ -684,15 +689,15 @@ public class FSHLog extends AbstractFSWAL { return this.disruptor.getRingBuffer().next(); } - private SyncFuture publishSyncOnRingBuffer() { + private SyncFuture publishSyncOnRingBuffer(boolean forceSync) { long sequence = getSequenceOnRingBuffer(); - return publishSyncOnRingBuffer(sequence); + return publishSyncOnRingBuffer(sequence, forceSync); } @VisibleForTesting - protected SyncFuture publishSyncOnRingBuffer(long sequence) { + protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) { // here we use ring buffer sequence as transaction id - SyncFuture syncFuture = getSyncFuture(sequence); + SyncFuture syncFuture = getSyncFuture(sequence).setForceSync(forceSync); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.load(syncFuture); @@ -703,8 +708,8 @@ public class FSHLog extends AbstractFSWAL { } // Sync all known transactions - private void publishSyncThenBlockOnCompletion(TraceScope scope) throws IOException { - SyncFuture syncFuture = publishSyncOnRingBuffer(); + private void publishSyncThenBlockOnCompletion(TraceScope scope, boolean forceSync) throws IOException { + SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync); blockOnSync(syncFuture); } @@ -731,19 +736,29 @@ public class FSHLog extends AbstractFSWAL { @Override public void sync() throws IOException { + sync(useHsync); + } + + @Override + public void sync(boolean forceSync) throws IOException { try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { - publishSyncThenBlockOnCompletion(scope); + publishSyncThenBlockOnCompletion(scope, forceSync); } } @Override public void sync(long txid) throws IOException { + sync(txid, useHsync); + } + + @Override + public void sync(long txid, boolean forceSync) throws IOException { if (this.highestSyncedTxid.get() >= txid) { // Already sync'd. return; } try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { - publishSyncThenBlockOnCompletion(scope); + publishSyncThenBlockOnCompletion(scope, forceSync); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 9d36429975a..2852047b5ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -72,11 +72,15 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter } @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { FSDataOutputStream fsdos = this.output; if (fsdos == null) return; // Presume closed fsdos.flush(); - fsdos.hflush(); + if (forceSync) { + fsdos.hsync(); + } else { + fsdos.hflush(); + } } public FSDataOutputStream getStream() { @@ -89,10 +93,13 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); - // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 - if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && - !(CommonFSUtils.hasCapability(output, "hflush"))) { - throw new StreamLacksCapabilityException("hflush"); + if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { + if (!CommonFSUtils.hasCapability(output, "hflush")) { + throw new StreamLacksCapabilityException("hflush"); + } + if (!CommonFSUtils.hasCapability(output, "hsync")) { + throw new StreamLacksCapabilityException("hsync"); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index bd30797f557..3e0998ef94b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -67,6 +67,8 @@ class SyncFuture { private Thread t; + private boolean forceSync; + /** * Call this method to clear old usage and get it ready for new deploy. * @param txid the new transaction id @@ -97,6 +99,15 @@ class SyncFuture { return this.txid; } + synchronized boolean isForceSync() { + return forceSync; + } + + synchronized SyncFuture setForceSync(boolean forceSync) { + this.forceSync = forceSync; + return this; + } + /** * @param txid the transaction id at which this future 'completed'. * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index db6c4117d97..3c857378a8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -139,6 +139,23 @@ public interface WAL extends Closeable, WALFileLengthProvider { */ void sync(long txid) throws IOException; + /** + * @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush + * vs hsync. + */ + default void sync(boolean forceSync) throws IOException { + sync(); + } + + /** + * @param txid Transaction id to sync to. + * @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush + * vs hsync. + */ + default void sync(long txid, boolean forceSync) throws IOException { + sync(txid); + } + /** * WAL keeps track of the sequence numbers that are as yet not flushed im memstores * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 7ad815e2f61..244a636226e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -79,7 +79,7 @@ public interface WALProvider { // Writers are used internally. Users outside of the WAL should be relying on the // interface provided by WAL. interface Writer extends WriterBase { - void sync() throws IOException; + void sync(boolean forceSync) throws IOException; void append(WAL.Entry entry) throws IOException; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 7cb3e63e644..3cf06d46bdf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -138,11 +138,11 @@ public class TestFailedAppendAndSync { } @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { if (throwSyncException) { throw new IOException("FAKE! Failed to replace a bad datanode..."); } - w.sync(); + w.sync(forceSync); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 31dfa2a4af0..3272afa34a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1132,8 +1132,8 @@ public class TestHRegion { } @Override - public void sync() throws IOException { - w.sync(); + public void sync(boolean forceSync) throws IOException { + w.sync(forceSync); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 9e051aeed1e..29a75b85468 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -172,11 +172,11 @@ public class TestWALLockup { } @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { if (throwException) { throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); } - w.sync(); + w.sync(forceSync); } @Override @@ -252,7 +252,7 @@ public class TestWALLockup { dodgyWAL.append(region.getRegionInfo(), key, edit, true); boolean exception = false; try { - dodgyWAL.sync(); + dodgyWAL.sync(false); } catch (Exception e) { exception = true; } @@ -345,7 +345,7 @@ public class TestWALLockup { protected void publishSyncOnRingBufferAndBlock(long sequence) { try { - super.blockOnSync(super.publishSyncOnRingBuffer(sequence)); + super.blockOnSync(super.publishSyncOnRingBuffer(sequence, false)); Assert.fail("Expect an IOException here."); } catch (IOException ignore) { // Here, we will get an IOException. @@ -362,7 +362,7 @@ public class TestWALLockup { } @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); } @@ -425,7 +425,7 @@ public class TestWALLockup { try { LOG.info("Call sync for testing whether RingBufferEventHandler is hanging."); - dodgyWAL.sync(); // Should not get a hang here, otherwise we will see timeout in this test. + dodgyWAL.sync(false); // Should not get a hang here, otherwise we will see timeout in this test. Assert.fail("Expect an IOException here."); } catch (IOException ignore) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index b1e304ec438..2d4b38542a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1230,7 +1230,7 @@ public abstract class AbstractTestWALReplay { for (FSWALEntry entry : entries) { writer.append(entry); } - writer.sync(); + writer.sync(false); writer.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 5ee0dfa7512..819df673c94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -67,9 +67,9 @@ public class TestLogRollingNoCluster { /** ProtobufLogWriter that simulates higher latencies in sync() call */ public static class HighLatencySyncWriter extends ProtobufLogWriter { @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { Threads.sleep(ThreadLocalRandom.current().nextInt(10)); - super.sync(); + super.sync(forceSync); Threads.sleep(ThreadLocalRandom.current().nextInt(10)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index 080b5be807c..2d938d47fb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -48,6 +48,6 @@ public class TestProtobufLog extends AbstractTestProtobufLog @Override protected void sync(Writer writer) throws IOException { - writer.sync(); + writer.sync(false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java new file mode 100644 index 00000000000..17f24e8e77f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Tests for WAL write durability - hflush vs hsync + */ +@Category({ MediumTests.class }) +public class TestWALDurability { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALDurability.class); + + private static final String COLUMN_FAMILY = "MyCF"; + private static final byte[] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Configuration conf; + private String dir; + @Rule + public TestName name = new TestName(); + + // Test names + protected TableName tableName; + + @Before + public void setup() throws IOException { + conf = TEST_UTIL.getConfiguration(); + dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); + tableName = TableName.valueOf(name.getMethodName()); + } + + @Test + public void testWALDurability() throws IOException { + class CustomFSLog extends FSHLog { + private Boolean syncFlag; + + public CustomFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + public void sync(boolean forceSync) throws IOException { + syncFlag = forceSync; + super.sync(forceSync); + } + + @Override + public void sync(long txid, boolean forceSync) throws IOException { + syncFlag = forceSync; + super.sync(txid, forceSync); + } + + private void resetSyncFlag() { + this.syncFlag = null; + } + + } + // global hbase.wal.hsync false, no override in put call - hflush + conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false"); + FileSystem fs = FileSystem.get(conf); + Path rootDir = new Path(dir + getName()); + CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf); + HRegion region = initHRegion(tableName, null, null, customFSLog); + byte[] bytes = Bytes.toBytes(getName()); + Put put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + + customFSLog.resetSyncFlag(); + assertNull(customFSLog.syncFlag); + region.put(put); + assertEquals(customFSLog.syncFlag, false); + + // global hbase.wal.hsync true, no override in put call + conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true"); + fs = FileSystem.get(conf); + customFSLog = new CustomFSLog(fs, rootDir, getName(), conf); + region = initHRegion(tableName, null, null, customFSLog); + + customFSLog.resetSyncFlag(); + assertNull(customFSLog.syncFlag); + region.put(put); + assertEquals(customFSLog.syncFlag, true); + + // global hbase.wal.hsync true, durability set in put call - fsync + put.setDurability(Durability.FSYNC_WAL); + customFSLog.resetSyncFlag(); + assertNull(customFSLog.syncFlag); + region.put(put); + assertEquals(customFSLog.syncFlag, true); + + // global hbase.wal.hsync true, durability set in put call - sync + put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + put.setDurability(Durability.SYNC_WAL); + customFSLog.resetSyncFlag(); + assertNull(customFSLog.syncFlag); + region.put(put); + assertEquals(customFSLog.syncFlag, false); + + HBaseTestingUtility.closeRegionAndWAL(region); + } + + private String getName() { + return name.getMethodName(); + } + + /** + * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} + * when done. + */ + public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) + throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT, + wal, COLUMN_FAMILY_BYTES); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index b98ca7f00f7..1bb361bf582 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -122,7 +122,7 @@ public class TestReplicationSource { WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new WAL.Entry(key, edit)); - writer.sync(); + writer.sync(false); } writer.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 4e8a4cc8734..7b440cebed7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -99,7 +99,7 @@ public class TestSerialReplication { for (Entry entry : replicateContext.getEntries()) { WRITER.append(entry); } - WRITER.sync(); + WRITER.sync(false); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index ddd11137080..c7f1c411448 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -51,10 +51,15 @@ public class FaultyFSLog extends FSHLog { @Override public void sync(long txid) throws IOException { + sync(txid, false); + } + + @Override + public void sync(long txid, boolean forceSync) throws IOException { if (this.ft == FailureType.SYNC) { throw new IOException("sync"); } - super.sync(txid); + super.sync(txid, forceSync); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 3928d9c11a1..e54f1f82dda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -266,9 +266,9 @@ public class IOTestProvider implements WALProvider { } @Override - public void sync() throws IOException { + public void sync(boolean forceSync) throws IOException { if (doSyncs) { - super.sync(); + super.sync(forceSync); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index f5800dff16c..030c99f41e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -1359,7 +1359,7 @@ public class TestWALSplit { WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); w.append(new Entry(key, edit)); - w.sync(); + w.sync(false); } private static void appendRegionEvent(Writer w, String region) throws IOException { @@ -1377,7 +1377,7 @@ public class TestWALSplit { HConstants.DEFAULT_CLUSTER_ID); w.append( new Entry(walKey, new WALEdit().add(kv))); - w.sync(); + w.sync(false); } public static long appendEntry(Writer writer, TableName table, byte[] region, @@ -1387,7 +1387,7 @@ public class TestWALSplit { LOG.info(Thread.currentThread().getName() + " append"); writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); LOG.info(Thread.currentThread().getName() + " sync"); - writer.sync(); + writer.sync(false); return seq; }