HBASE-20461 Implement fsync for AsyncFSWAL (#947)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-12-19 13:30:15 +08:00
parent af0ce53836
commit 7a1e9ca397
10 changed files with 254 additions and 74 deletions

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -204,6 +205,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
protected final int maxLogs;
protected final boolean useHsync;
/**
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
@ -454,6 +457,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
};
this.implClassName = getClass().getSimpleName();
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
}
/**
@ -895,8 +899,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
}
protected final SyncFuture getSyncFuture(long sequence) {
return cachedSyncFutures.get().reset(sequence);
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
}
protected boolean isLogRollRequested() {

View File

@ -353,7 +353,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
addListener(writer.sync(), (result, error) -> {
addListener(writer.sync(useHsync), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {
@ -576,11 +576,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
public void sync() throws IOException {
sync(useHsync);
}
@Override
public void sync(long txid) throws IOException {
sync(txid, useHsync);
}
@Override
public void sync(boolean forceSync) throws IOException {
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid);
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
@ -594,7 +604,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
@Override
public void sync(long txid) throws IOException {
public void sync(long txid, boolean forceSync) throws IOException {
if (highestSyncedTxid.get() >= txid) {
return;
}
@ -603,7 +613,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid);
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {

View File

@ -140,8 +140,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
@Override
public CompletableFuture<Long> sync() {
return output.flush(false);
public CompletableFuture<Long> sync(boolean forceSync) {
return output.flush(forceSync);
}
@Override

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -134,8 +133,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
private final int minTolerableReplication;
private final boolean useHsync;
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
@ -186,6 +183,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* @param logDir dir where wals are stored
* @param conf configuration to use
*/
@VisibleForTesting
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
throws IOException {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
@ -219,8 +217,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
5);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
@ -711,7 +707,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
@VisibleForTesting
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
// here we use ring buffer sequence as transaction id
SyncFuture syncFuture = getSyncFuture(sequence).setForceSync(forceSync);
SyncFuture syncFuture = getSyncFuture(sequence, forceSync);
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
truck.load(syncFuture);

View File

@ -85,7 +85,7 @@ public interface WALProvider {
}
interface AsyncWriter extends WriterBase {
CompletableFuture<Long> sync();
CompletableFuture<Long> sync(boolean forceSync);
void append(WAL.Entry entry);
}

View File

@ -156,8 +156,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
}
@Override
public CompletableFuture<Long> sync() {
CompletableFuture<Long> result = writer.sync();
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
if (failedCount.incrementAndGet() < 1000) {
CompletableFuture<Long> future = new CompletableFuture<>();
FutureUtils.addListener(result,

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ RegionServerServices.class, MediumTests.class })
public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class);
private static NioEventLoopGroup GROUP;
@BeforeClass
public static void setUpBeforeClass() {
GROUP = new NioEventLoopGroup();
}
@AfterClass
public static void tearDownAfterClass() {
GROUP.shutdownGracefully();
}
@Override
protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
CustomAsyncFSWAL wal =
new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class);
wal.init();
return wal;
}
@Override
protected void resetSyncFlag(CustomAsyncFSWAL wal) {
wal.resetSyncFlag();
}
@Override
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
return wal.getSyncFlag();
}
}
class CustomAsyncFSWAL extends AsyncFSWAL {
private Boolean syncFlag;
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
eventLoopGroup, channelClass);
}
@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
}
@Override
public void sync(long txid, boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(txid, forceSync);
}
void resetSyncFlag() {
this.syncFlag = null;
}
Boolean getSyncFlag() {
return syncFlag;
}
}

View File

@ -77,7 +77,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override
protected void sync(AsyncWriter writer) throws IOException {
try {
writer.sync().get();
writer.sync(false).get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ RegionServerServices.class, MediumTests.class })
public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLogDurability.class);
@Override
protected CustomFSHLog getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
CustomFSHLog wal = new CustomFSHLog(fs, root, logDir, conf);
wal.init();
return wal;
}
@Override
protected void resetSyncFlag(CustomFSHLog wal) {
wal.resetSyncFlag();
}
@Override
protected Boolean getSyncFlag(CustomFSHLog wal) {
return wal.getSyncFlag();
}
}
class CustomFSHLog extends FSHLog {
private Boolean syncFlag;
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
}
@Override
public void sync(long txid, boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(txid, forceSync);
}
void resetSyncFlag() {
this.syncFlag = null;
}
Boolean getSyncFlag() {
return syncFlag;
}
}

View File

@ -18,14 +18,14 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
@ -33,25 +33,18 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Tests for WAL write durability - hflush vs hsync
*/
@Category({ MediumTests.class })
public class TestWALDurability {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALDurability.class);
public abstract class WALDurabilityTestBase<T extends WAL> {
private static final String COLUMN_FAMILY = "MyCF";
private static final byte[] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
@ -66,82 +59,70 @@ public class TestWALDurability {
protected TableName tableName;
@Before
public void setup() throws IOException {
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@After
public void tearDown() throws IOException {
TEST_UTIL.cleanupTestDir();
}
protected abstract T getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException;
protected abstract void resetSyncFlag(T wal);
protected abstract Boolean getSyncFlag(T wal);
@Test
public void testWALDurability() throws IOException {
class CustomFSLog extends FSHLog {
private Boolean syncFlag;
public CustomFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
}
@Override
public void sync(long txid, boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(txid, forceSync);
}
private void resetSyncFlag() {
this.syncFlag = null;
}
}
// global hbase.wal.hsync false, no override in put call - hflush
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
HRegion region = initHRegion(tableName, null, null, customFSLog);
T wal = getWAL(fs, rootDir, getName(), conf);
HRegion region = initHRegion(tableName, null, null, wal);
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
customFSLog.resetSyncFlag();
assertNull(customFSLog.syncFlag);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertEquals(customFSLog.syncFlag, false);
assertFalse(getSyncFlag(wal));
region.close();
wal.close();
// global hbase.wal.hsync true, no override in put call
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
region = initHRegion(tableName, null, null, customFSLog);
wal = getWAL(fs, rootDir, getName(), conf);
region = initHRegion(tableName, null, null, wal);
customFSLog.resetSyncFlag();
assertNull(customFSLog.syncFlag);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertEquals(customFSLog.syncFlag, true);
assertEquals(getSyncFlag(wal), true);
// global hbase.wal.hsync true, durability set in put call - fsync
put.setDurability(Durability.FSYNC_WAL);
customFSLog.resetSyncFlag();
assertNull(customFSLog.syncFlag);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertEquals(customFSLog.syncFlag, true);
assertTrue(getSyncFlag(wal));
// global hbase.wal.hsync true, durability set in put call - sync
put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
put.setDurability(Durability.SYNC_WAL);
customFSLog.resetSyncFlag();
assertNull(customFSLog.syncFlag);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertEquals(customFSLog.syncFlag, false);
assertFalse(getSyncFlag(wal));
HBaseTestingUtility.closeRegionAndWAL(region);
}
@ -155,7 +136,7 @@ public class TestWALDurability {
* when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
wal, COLUMN_FAMILY_BYTES);