From fe2409971cfd1de3b7f55fdd8447b9dddf57796a Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 5 Aug 2019 16:19:05 +0800 Subject: [PATCH] HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437) Signed-off-by: Zheng Hu --- .../apache/hadoop/hbase/ipc/ServerCall.java | 49 ++++++-- .../hbase/regionserver/wal/AbstractFSWAL.java | 8 +- .../hbase/regionserver/wal/AsyncFSWAL.java | 4 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 6 +- .../hbase/regionserver/wal/FSWALEntry.java | 38 +++--- .../wal/AbstractTestWALReplay.java | 5 +- .../regionserver/wal/TestAsyncFSWAL.java | 2 +- .../hbase/regionserver/wal/TestFSHLog.java | 2 +- ...SWALCorruptionDueToDanglingByteBuffer.java | 111 ++++++++++++++++++ ...HLogCorruptionDueToDanglingByteBuffer.java | 95 +++++++++++++++ ...uptionDueToDanglingByteBufferTestBase.java | 92 +++++++++++++++ 11 files changed, 379 insertions(+), 33 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index cf1cf9a14ae..881828bf97c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -23,7 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; - +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.yetus.audience.InterfaceAudience; @@ -51,7 +51,7 @@ import org.apache.hadoop.util.StringUtils; * the result. */ @InterfaceAudience.Private -abstract class ServerCall implements RpcCall, RpcResponse { +public abstract class ServerCall implements RpcCall, RpcResponse { protected final int id; // the client's call id protected final BlockingService service; @@ -91,8 +91,14 @@ abstract class ServerCall implements RpcCall, Rpc private long exceptionSize = 0; private final boolean retryImmediatelySupported; - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", - justification="Can't figure why this complaint is happening... see below") + // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the + // second bit is for WAL reference. We can only call release if both of them are zero. The reason + // why we can not use a general reference counting is that, we may call cleanup multiple times in + // the current implementation. We should fix this in the future. + private final AtomicInteger reference = new AtomicInteger(0b01); + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", + justification = "Can't figure why this complaint is happening... see below") ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, @@ -141,14 +147,43 @@ abstract class ServerCall implements RpcCall, Rpc cleanup(); } + private void release(int mask) { + for (;;) { + int ref = reference.get(); + if ((ref & mask) == 0) { + return; + } + int nextRef = ref & (~mask); + if (reference.compareAndSet(ref, nextRef)) { + if (nextRef == 0) { + if (this.reqCleanup != null) { + this.reqCleanup.run(); + } + } + return; + } + } + } + @Override public void cleanup() { - if (this.reqCleanup != null) { - this.reqCleanup.run(); - this.reqCleanup = null; + release(0b01); + } + + public void retainByWAL() { + for (;;) { + int ref = reference.get(); + int nextRef = ref | 0b10; + if (reference.compareAndSet(ref, nextRef)) { + return; + } } } + public void releaseByWAL() { + release(0b10); + } + @Override public String toString() { return toShortString() + " param: " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 43f15120548..5a4ea3c994b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.ServerCall; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -901,7 +903,7 @@ public abstract class AbstractFSWAL implements WAL { * Exposed for testing only. Use to tricks like halt the ring buffer appending. */ @VisibleForTesting - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { // Noop } @@ -977,8 +979,10 @@ public abstract class AbstractFSWAL implements WAL { txidHolder.setValue(ringBuffer.next()); }); long txid = txidHolder.longValue(); + ServerCall rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) + .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 79409a0245a..10c1a531a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -321,7 +321,9 @@ public class AsyncFSWAL extends AbstractFSWAL { private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { highestSyncedTxid.set(processedTxid); for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { - if (iter.next().getTxid() <= processedTxid) { + FSWALEntry entry = iter.next(); + if (entry.getTxid() <= processedTxid) { + entry.release(); iter.remove(); } else { break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b77210ed481..f10b1ce0042 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,7 +24,6 @@ import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; - import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -34,7 +33,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -59,6 +57,7 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -963,7 +962,6 @@ public class FSHLog extends AbstractFSWAL { //TODO handle htrace API change, see HBASE-18895 //TraceScope scope = Trace.continueSpan(entry.detachSpan()); try { - if (this.exception != null) { // Return to keep processing events coming off the ringbuffer return; @@ -980,6 +978,8 @@ public class FSHLog extends AbstractFSWAL { : new DamagedWALException("On sync", this.exception)); // Return to keep processing events coming off the ringbuffer return; + } finally { + entry.release(); } } else { // What is this if not an append or sync. Fail all up to this!!! diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 63132121fc3..1b44fccc38f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -17,19 +17,17 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static java.util.stream.Collectors.toCollection; - import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; - import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.ipc.ServerCall; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -56,19 +54,24 @@ class FSWALEntry extends Entry { private final transient boolean inMemstore; private final transient RegionInfo regionInfo; private final transient Set familyNames; + private final transient Optional> rpcCall; - FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, - final RegionInfo regionInfo, final boolean inMemstore) { + FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo, + final boolean inMemstore, ServerCall rpcCall) { super(key, edit); this.inMemstore = inMemstore; this.regionInfo = regionInfo; this.txid = txid; if (inMemstore) { // construct familyNames here to reduce the work of log sinker. - Set families = edit.getFamilies(); - this.familyNames = families != null? families: collectFamilies(edit.getCells()); + Set families = edit.getFamilies(); + this.familyNames = families != null ? families : collectFamilies(edit.getCells()); } else { - this.familyNames = Collections.emptySet(); + this.familyNames = Collections. emptySet(); + } + this.rpcCall = Optional.ofNullable(rpcCall); + if (rpcCall != null) { + rpcCall.retainByWAL(); } } @@ -77,12 +80,13 @@ class FSWALEntry extends Entry { if (CollectionUtils.isEmpty(cells)) { return Collections.emptySet(); } else { - return cells.stream() - .filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY)) - .collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies))) - .stream() - .map(CellUtil::cloneFamily) - .collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); + Set set = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (Cell cell: cells) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + set.add(CellUtil.cloneFamily(cell)); + } + } + return set; } } @@ -129,4 +133,8 @@ class FSWALEntry extends Entry { Set getFamilyNames() { return familyNames; } + + void release() { + rpcCall.ifPresent(ServerCall::releaseByWAL); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 491ddfb3489..8aeff79a4fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1158,9 +1158,8 @@ public abstract class AbstractTestWALReplay { private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, int index, NavigableMap scopes) throws IOException { - FSWALEntry entry = - new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( - rowName, family, ee, index), hri, true); + FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), + createWALEdit(rowName, family, ee, index), hri, true, null); entry.stampRegionSequenceId(mvcc.begin()); return entry; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 93024288e70..effecb8c9d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -112,7 +112,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) { @Override - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { action.run(); super.atHeadOfRingBufferEventHandlerAppend(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 614ba79be2a..8f6e51844c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -89,7 +89,7 @@ public class TestFSHLog extends AbstractTestFSWAL { conf, listeners, failIfWALExists, prefix, suffix) { @Override - void atHeadOfRingBufferEventHandlerAppend() { + protected void atHeadOfRingBufferEventHandlerAppend() { action.run(); super.atHeadOfRingBufferEventHandlerAppend(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java new file mode 100644 index 00000000000..46aa87107b4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +/** + * Testcase for HBASE-22539 + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer + extends WALCorruptionDueToDanglingByteBufferTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class); + + public static final class PauseWAL extends AsyncFSWAL { + + public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class channelClass) throws FailedLogCloseException, IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, + eventLoopGroup, channelClass); + } + + @Override + protected void atHeadOfRingBufferEventHandlerAppend() { + if (ARRIVE != null) { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + } + } + } + } + + public static final class PauseWALProvider extends AbstractFSWALProvider { + + private EventLoopGroup eventLoopGroup; + + private Class channelClass; + + @Override + protected PauseWAL createWAL() throws IOException { + return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, + channelClass); + } + + @Override + protected void doInit(Configuration conf) throws IOException { + Pair> eventLoopGroupAndChannelClass = + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, + WALProvider.class); + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, CF); + UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java new file mode 100644 index 00000000000..73f7ad4ae13 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-22539 + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFSHLogCorruptionDueToDanglingByteBuffer + extends WALCorruptionDueToDanglingByteBufferTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFSHLogCorruptionDueToDanglingByteBuffer.class); + + public static final class PauseWAL extends FSHLog { + + public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix) throws IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + @Override + protected void atHeadOfRingBufferEventHandlerAppend() { + if (ARRIVE != null) { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + } + } + } + } + + public static final class PauseWALProvider extends AbstractFSWALProvider { + + @Override + protected PauseWAL createWAL() throws IOException { + return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + } + + @Override + protected void doInit(Configuration conf) throws IOException { + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, + WALProvider.class); + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, CF); + UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java new file mode 100644 index 00000000000..127ed86f323 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Testcase for HBASE-22539 + */ +public abstract class WALCorruptionDueToDanglingByteBufferTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static CountDownLatch ARRIVE; + + protected static CountDownLatch RESUME; + + protected static TableName TABLE_NAME = TableName.valueOf("Corruption"); + + protected static byte[] CF = Bytes.toBytes("cf"); + + protected static byte[] CQ = Bytes.toBytes("cq"); + + private byte[] getBytes(String prefix, int index) { + return Bytes.toBytes(String.format("%s-%08d", prefix, index)); + } + + @Test + public void test() throws Exception { + LOG.info("Stop WAL appending..."); + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + LOG.info("Put 100 rows with " + Durability.ASYNC_WAL + "..."); + for (int i = 0; i < 100; i++) { + table.batch(Arrays.asList(new Put(getBytes("row", i)) + .addColumn(CF, CQ, getBytes("value", i)).setDurability(Durability.ASYNC_WAL)), + new Object[1]); + } + ARRIVE.await(); + ARRIVE = null; + LOG.info("Resume WAL appending..."); + RESUME.countDown(); + LOG.info("Put a single row to force a WAL sync..."); + table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value"))); + LOG.info("Abort the only region server"); + UTIL.getMiniHBaseCluster().abortRegionServer(0); + LOG.info("Start a new region server"); + UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000); + UTIL.waitTableAvailable(TABLE_NAME); + LOG.info("Check if all rows are still valid"); + for (int i = 0; i < 100; i++) { + Result result = table.get(new Get(getBytes("row", i))); + assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ))); + } + Result result = table.get(new Get(Bytes.toBytes("row"))); + assertEquals("value", Bytes.toString(result.getValue(CF, CQ))); + } + } +}