HBASE-20461 Implement fsync for AsyncFSWAL (#947)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
17e180e4ee
commit
80ba354e2e
@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
|||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerCall;
|
import org.apache.hadoop.hbase.ipc.ServerCall;
|
||||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
@ -211,6 +212,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||||||
*/
|
*/
|
||||||
protected final int maxLogs;
|
protected final int maxLogs;
|
||||||
|
|
||||||
|
protected final boolean useHsync;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
|
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
|
||||||
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
|
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
|
||||||
@ -472,6 +475,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||||||
this.implClassName = getClass().getSimpleName();
|
this.implClassName = getClass().getSimpleName();
|
||||||
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
|
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
|
||||||
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
|
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
|
||||||
|
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -937,8 +941,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||||||
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
|
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final SyncFuture getSyncFuture(long sequence) {
|
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
|
||||||
return cachedSyncFutures.get().reset(sequence);
|
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isLogRollRequested() {
|
protected boolean isLogRollRequested() {
|
||||||
|
@ -353,7 +353,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||||||
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
|
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
|
||||||
final long startTimeNs = System.nanoTime();
|
final long startTimeNs = System.nanoTime();
|
||||||
final long epoch = (long) epochAndState >>> 2L;
|
final long epoch = (long) epochAndState >>> 2L;
|
||||||
addListener(writer.sync(), (result, error) -> {
|
addListener(writer.sync(useHsync), (result, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
syncFailed(epoch, error);
|
syncFailed(epoch, error);
|
||||||
} else {
|
} else {
|
||||||
@ -630,11 +630,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
|
sync(useHsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(long txid) throws IOException {
|
||||||
|
sync(txid, useHsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(boolean forceSync) throws IOException {
|
||||||
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
|
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
|
||||||
long txid = waitingConsumePayloads.next();
|
long txid = waitingConsumePayloads.next();
|
||||||
SyncFuture future;
|
SyncFuture future;
|
||||||
try {
|
try {
|
||||||
future = getSyncFuture(txid);
|
future = getSyncFuture(txid, forceSync);
|
||||||
RingBufferTruck truck = waitingConsumePayloads.get(txid);
|
RingBufferTruck truck = waitingConsumePayloads.get(txid);
|
||||||
truck.load(future);
|
truck.load(future);
|
||||||
} finally {
|
} finally {
|
||||||
@ -648,7 +658,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sync(long txid) throws IOException {
|
public void sync(long txid, boolean forceSync) throws IOException {
|
||||||
if (highestSyncedTxid.get() >= txid) {
|
if (highestSyncedTxid.get() >= txid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -657,7 +667,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||||||
long sequence = waitingConsumePayloads.next();
|
long sequence = waitingConsumePayloads.next();
|
||||||
SyncFuture future;
|
SyncFuture future;
|
||||||
try {
|
try {
|
||||||
future = getSyncFuture(txid);
|
future = getSyncFuture(txid, forceSync);
|
||||||
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
|
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
|
||||||
truck.load(future);
|
truck.load(future);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -140,8 +140,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Long> sync() {
|
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||||
return output.flush(false);
|
return output.flush(forceSync);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -74,10 +74,10 @@ public final class CombinedAsyncWriter implements AsyncWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Long> sync() {
|
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||||
AtomicInteger remaining = new AtomicInteger(writers.size());
|
AtomicInteger remaining = new AtomicInteger(writers.size());
|
||||||
writers.forEach(w -> addListener(w.sync(), (length, error) -> {
|
writers.forEach(w -> addListener(w.sync(forceSync), (length, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
future.completeExceptionally(error);
|
future.completeExceptionally(error);
|
||||||
return;
|
return;
|
||||||
|
@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
@ -134,8 +133,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||||||
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
||||||
private final int minTolerableReplication;
|
private final int minTolerableReplication;
|
||||||
|
|
||||||
private final boolean useHsync;
|
|
||||||
|
|
||||||
// If live datanode count is lower than the default replicas value,
|
// If live datanode count is lower than the default replicas value,
|
||||||
// RollWriter will be triggered in each sync(So the RollWriter will be
|
// RollWriter will be triggered in each sync(So the RollWriter will be
|
||||||
// triggered one by one in a short time). Using it as a workaround to slow
|
// triggered one by one in a short time). Using it as a workaround to slow
|
||||||
@ -186,6 +183,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||||||
* @param logDir dir where wals are stored
|
* @param logDir dir where wals are stored
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
*/
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
|
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||||
@ -218,7 +216,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||||||
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
|
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
|
||||||
5);
|
5);
|
||||||
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
|
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
|
||||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
|
||||||
|
|
||||||
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
||||||
// put on the ring buffer.
|
// put on the ring buffer.
|
||||||
@ -715,7 +712,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
|
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
|
||||||
// here we use ring buffer sequence as transaction id
|
// here we use ring buffer sequence as transaction id
|
||||||
SyncFuture syncFuture = getSyncFuture(sequence).setForceSync(forceSync);
|
SyncFuture syncFuture = getSyncFuture(sequence, forceSync);
|
||||||
try {
|
try {
|
||||||
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
||||||
truck.load(syncFuture);
|
truck.load(syncFuture);
|
||||||
|
@ -85,7 +85,7 @@ public interface WALProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
interface AsyncWriter extends WriterBase {
|
interface AsyncWriter extends WriterBase {
|
||||||
CompletableFuture<Long> sync();
|
CompletableFuture<Long> sync(boolean forceSync);
|
||||||
|
|
||||||
void append(WAL.Entry entry);
|
void append(WAL.Entry entry);
|
||||||
}
|
}
|
||||||
|
@ -156,8 +156,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Long> sync() {
|
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||||
CompletableFuture<Long> result = writer.sync();
|
CompletableFuture<Long> result = writer.sync(forceSync);
|
||||||
if (failedCount.incrementAndGet() < 1000) {
|
if (failedCount.incrementAndGet() < 1000) {
|
||||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||||
FutureUtils.addListener(result,
|
FutureUtils.addListener(result,
|
||||||
|
@ -0,0 +1,106 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
|
||||||
|
@Category({ RegionServerServices.class, MediumTests.class })
|
||||||
|
public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class);
|
||||||
|
|
||||||
|
private static NioEventLoopGroup GROUP;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() {
|
||||||
|
GROUP = new NioEventLoopGroup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() {
|
||||||
|
GROUP.shutdownGracefully();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
CustomAsyncFSWAL wal =
|
||||||
|
new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class);
|
||||||
|
wal.init();
|
||||||
|
return wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void resetSyncFlag(CustomAsyncFSWAL wal) {
|
||||||
|
wal.resetSyncFlag();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
|
||||||
|
return wal.getSyncFlag();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class CustomAsyncFSWAL extends AsyncFSWAL {
|
||||||
|
private Boolean syncFlag;
|
||||||
|
|
||||||
|
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
|
||||||
|
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
|
||||||
|
throws FailedLogCloseException, IOException {
|
||||||
|
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
|
||||||
|
eventLoopGroup, channelClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(boolean forceSync) throws IOException {
|
||||||
|
syncFlag = forceSync;
|
||||||
|
super.sync(forceSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(long txid, boolean forceSync) throws IOException {
|
||||||
|
syncFlag = forceSync;
|
||||||
|
super.sync(txid, forceSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetSyncFlag() {
|
||||||
|
this.syncFlag = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Boolean getSyncFlag() {
|
||||||
|
return syncFlag;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ RegionServerServices.class, MediumTests.class })
|
||||||
|
public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestFSHLogDurability.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CustomFSHLog getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
CustomFSHLog wal = new CustomFSHLog(fs, root, logDir, conf);
|
||||||
|
wal.init();
|
||||||
|
return wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void resetSyncFlag(CustomFSHLog wal) {
|
||||||
|
wal.resetSyncFlag();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean getSyncFlag(CustomFSHLog wal) {
|
||||||
|
return wal.getSyncFlag();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class CustomFSHLog extends FSHLog {
|
||||||
|
private Boolean syncFlag;
|
||||||
|
|
||||||
|
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
super(fs, root, logDir, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(boolean forceSync) throws IOException {
|
||||||
|
syncFlag = forceSync;
|
||||||
|
super.sync(forceSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(long txid, boolean forceSync) throws IOException {
|
||||||
|
syncFlag = forceSync;
|
||||||
|
super.sync(txid, forceSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetSyncFlag() {
|
||||||
|
this.syncFlag = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Boolean getSyncFlag() {
|
||||||
|
return syncFlag;
|
||||||
|
}
|
||||||
|
}
|
@ -18,14 +18,14 @@
|
|||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
@ -33,25 +33,18 @@ import org.apache.hadoop.hbase.client.Put;
|
|||||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for WAL write durability - hflush vs hsync
|
* Tests for WAL write durability - hflush vs hsync
|
||||||
*/
|
*/
|
||||||
@Category({ MediumTests.class })
|
public abstract class WALDurabilityTestBase<T extends WAL> {
|
||||||
public class TestWALDurability {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestWALDurability.class);
|
|
||||||
|
|
||||||
private static final String COLUMN_FAMILY = "MyCF";
|
private static final String COLUMN_FAMILY = "MyCF";
|
||||||
private static final byte[] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
private static final byte[] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
||||||
@ -66,82 +59,70 @@ public class TestWALDurability {
|
|||||||
protected TableName tableName;
|
protected TableName tableName;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setUp() throws IOException {
|
||||||
conf = TEST_UTIL.getConfiguration();
|
conf = TEST_UTIL.getConfiguration();
|
||||||
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
||||||
tableName = TableName.valueOf(name.getMethodName());
|
tableName = TableName.valueOf(name.getMethodName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract T getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
protected abstract void resetSyncFlag(T wal);
|
||||||
|
|
||||||
|
protected abstract Boolean getSyncFlag(T wal);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWALDurability() throws IOException {
|
public void testWALDurability() throws IOException {
|
||||||
class CustomFSLog extends FSHLog {
|
|
||||||
private Boolean syncFlag;
|
|
||||||
|
|
||||||
public CustomFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
super(fs, root, logDir, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sync(boolean forceSync) throws IOException {
|
|
||||||
syncFlag = forceSync;
|
|
||||||
super.sync(forceSync);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sync(long txid, boolean forceSync) throws IOException {
|
|
||||||
syncFlag = forceSync;
|
|
||||||
super.sync(txid, forceSync);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void resetSyncFlag() {
|
|
||||||
this.syncFlag = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
// global hbase.wal.hsync false, no override in put call - hflush
|
// global hbase.wal.hsync false, no override in put call - hflush
|
||||||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
|
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
Path rootDir = new Path(dir + getName());
|
Path rootDir = new Path(dir + getName());
|
||||||
CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
|
T wal = getWAL(fs, rootDir, getName(), conf);
|
||||||
customFSLog.init();
|
HRegion region = initHRegion(tableName, null, null, wal);
|
||||||
HRegion region = initHRegion(tableName, null, null, customFSLog);
|
|
||||||
byte[] bytes = Bytes.toBytes(getName());
|
byte[] bytes = Bytes.toBytes(getName());
|
||||||
Put put = new Put(bytes);
|
Put put = new Put(bytes);
|
||||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||||
|
|
||||||
customFSLog.resetSyncFlag();
|
resetSyncFlag(wal);
|
||||||
assertNull(customFSLog.syncFlag);
|
assertNull(getSyncFlag(wal));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
assertEquals(customFSLog.syncFlag, false);
|
assertFalse(getSyncFlag(wal));
|
||||||
|
|
||||||
|
region.close();
|
||||||
|
wal.close();
|
||||||
|
|
||||||
// global hbase.wal.hsync true, no override in put call
|
// global hbase.wal.hsync true, no override in put call
|
||||||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
|
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
|
wal = getWAL(fs, rootDir, getName(), conf);
|
||||||
customFSLog.init();
|
region = initHRegion(tableName, null, null, wal);
|
||||||
region = initHRegion(tableName, null, null, customFSLog);
|
|
||||||
|
|
||||||
customFSLog.resetSyncFlag();
|
resetSyncFlag(wal);
|
||||||
assertNull(customFSLog.syncFlag);
|
assertNull(getSyncFlag(wal));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
assertEquals(customFSLog.syncFlag, true);
|
assertEquals(getSyncFlag(wal), true);
|
||||||
|
|
||||||
// global hbase.wal.hsync true, durability set in put call - fsync
|
// global hbase.wal.hsync true, durability set in put call - fsync
|
||||||
put.setDurability(Durability.FSYNC_WAL);
|
put.setDurability(Durability.FSYNC_WAL);
|
||||||
customFSLog.resetSyncFlag();
|
resetSyncFlag(wal);
|
||||||
assertNull(customFSLog.syncFlag);
|
assertNull(getSyncFlag(wal));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
assertEquals(customFSLog.syncFlag, true);
|
assertTrue(getSyncFlag(wal));
|
||||||
|
|
||||||
// global hbase.wal.hsync true, durability set in put call - sync
|
// global hbase.wal.hsync true, durability set in put call - sync
|
||||||
put = new Put(bytes);
|
put = new Put(bytes);
|
||||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||||
put.setDurability(Durability.SYNC_WAL);
|
put.setDurability(Durability.SYNC_WAL);
|
||||||
customFSLog.resetSyncFlag();
|
resetSyncFlag(wal);
|
||||||
assertNull(customFSLog.syncFlag);
|
assertNull(getSyncFlag(wal));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
assertEquals(customFSLog.syncFlag, false);
|
assertFalse(getSyncFlag(wal));
|
||||||
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||||
}
|
}
|
||||||
@ -155,7 +136,7 @@ public class TestWALDurability {
|
|||||||
* when done.
|
* when done.
|
||||||
*/
|
*/
|
||||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
|
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
|
||||||
wal, COLUMN_FAMILY_BYTES);
|
wal, COLUMN_FAMILY_BYTES);
|
@ -52,7 +52,7 @@ class WriterOverAsyncWriter implements WALProvider.Writer {
|
|||||||
@Override
|
@Override
|
||||||
public void sync(boolean forceSync) throws IOException {
|
public void sync(boolean forceSync) throws IOException {
|
||||||
try {
|
try {
|
||||||
asyncWriter.sync().get();
|
asyncWriter.sync(forceSync).get();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new InterruptedIOException();
|
throw new InterruptedIOException();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
|
@ -66,17 +66,17 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Long> sync() {
|
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||||
CompletableFuture<Long> localFuture;
|
CompletableFuture<Long> localFuture;
|
||||||
CompletableFuture<Long> remoteFuture;
|
CompletableFuture<Long> remoteFuture;
|
||||||
if (!localBroken) {
|
if (!localBroken) {
|
||||||
localFuture = localWriter.sync();
|
localFuture = localWriter.sync(forceSync);
|
||||||
} else {
|
} else {
|
||||||
localFuture = new CompletableFuture<>();
|
localFuture = new CompletableFuture<>();
|
||||||
localFuture.completeExceptionally(new IOException("Inject error"));
|
localFuture.completeExceptionally(new IOException("Inject error"));
|
||||||
}
|
}
|
||||||
if (!remoteBroken) {
|
if (!remoteBroken) {
|
||||||
remoteFuture = remoteWriter.sync();
|
remoteFuture = remoteWriter.sync(forceSync);
|
||||||
} else {
|
} else {
|
||||||
remoteFuture = new CompletableFuture<>();
|
remoteFuture = new CompletableFuture<>();
|
||||||
remoteFuture.completeExceptionally(new IOException("Inject error"));
|
remoteFuture.completeExceptionally(new IOException("Inject error"));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user