HBASE-24984 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used with multi operation (#3495)
Signed-off-by: zhangduo <zhangduo@apache.org> Signed-off-by: Anoop <anoopsamjohn@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Huaxiang Sun <huaxiangsun@apache.org>
This commit is contained in:
parent
16721239e7
commit
21c4578f22
|
@ -97,11 +97,13 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
private long exceptionSize = 0;
|
||||
private final boolean retryImmediatelySupported;
|
||||
|
||||
// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
|
||||
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
|
||||
// why we can not use a general reference counting is that, we may call cleanup multiple times in
|
||||
// the current implementation. We should fix this in the future.
|
||||
private final AtomicInteger reference = new AtomicInteger(0b01);
|
||||
// This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
|
||||
// the rest of the bits are for WAL reference count. We can only call release if all of them are
|
||||
// zero. The reason why we can not use a general reference counting is that, we may call cleanup
|
||||
// multiple times in the current implementation. We should fix this in the future.
|
||||
// The refCount here will start as 0x80000000 and increment with every WAL reference and decrement
|
||||
// from WAL side on release
|
||||
private final AtomicInteger reference = new AtomicInteger(0x80000000);
|
||||
|
||||
private final Span span;
|
||||
|
||||
|
@ -157,13 +159,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
span.end();
|
||||
}
|
||||
|
||||
private void release(int mask) {
|
||||
@Override
|
||||
public void cleanup() {
|
||||
for (;;) {
|
||||
int ref = reference.get();
|
||||
if ((ref & mask) == 0) {
|
||||
if ((ref & 0x80000000) == 0) {
|
||||
return;
|
||||
}
|
||||
int nextRef = ref & (~mask);
|
||||
int nextRef = ref & 0x7fffffff;
|
||||
if (reference.compareAndSet(ref, nextRef)) {
|
||||
if (nextRef == 0) {
|
||||
if (this.reqCleanup != null) {
|
||||
|
@ -175,23 +178,20 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
release(0b01);
|
||||
}
|
||||
|
||||
public void retainByWAL() {
|
||||
for (;;) {
|
||||
int ref = reference.get();
|
||||
int nextRef = ref | 0b10;
|
||||
if (reference.compareAndSet(ref, nextRef)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
reference.incrementAndGet();
|
||||
}
|
||||
|
||||
public void releaseByWAL() {
|
||||
release(0b10);
|
||||
// Here this method of decrementAndGet for releasing WAL reference count will work in both
|
||||
// cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive
|
||||
// value respectively in these 2 cases, but the logic will work the same way
|
||||
if (reference.decrementAndGet() == 0) {
|
||||
if (this.reqCleanup != null) {
|
||||
this.reqCleanup.run();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer
|
||||
extends WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
|
||||
.forClass(TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.class);
|
||||
|
||||
public static final class PauseWAL extends FSHLog {
|
||||
|
||||
private int testTableWalAppendsCount = 0;
|
||||
|
||||
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||
String prefix, String suffix) throws IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void atHeadOfRingBufferEventHandlerAppend() {
|
||||
// Let the 1st Append go through. The write thread will wait for this to go through before
|
||||
// calling further put()
|
||||
if (ARRIVE != null) { // Means appends as part of puts in testcase
|
||||
// Sleep for a second so that RS handler thread put all the mini batch WAL appends to ring
|
||||
// buffer.
|
||||
if (testTableWalAppendsCount == 0) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
// Let the first minibatch write go through. When 2nd one comes, notify the waiting test
|
||||
// case for doing further batch puts and make this WAL append thread to pause
|
||||
if (testTableWalAppendsCount == 1) {
|
||||
ARRIVE.countDown();
|
||||
try {
|
||||
RESUME.await();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
testTableWalAppendsCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
|
||||
|
||||
@Override
|
||||
protected PauseWAL createWAL() throws IOException {
|
||||
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
|
||||
conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInit(Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
|
||||
WALProvider.class);
|
||||
UTIL.getConfiguration().setInt(HRegion.HBASE_REGIONSERVER_MINIBATCH_SIZE, 1);
|
||||
UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
||||
SimpleRpcServer.class.getName());
|
||||
UTIL.getConfiguration().setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 1);
|
||||
UTIL.getConfiguration().setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024);
|
||||
UTIL.getConfiguration().setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 500);
|
||||
UTIL.startMiniCluster(1);
|
||||
UTIL.createTable(TABLE_NAME, CF);
|
||||
UTIL.waitTableAvailable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.class);
|
||||
|
||||
protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||
|
||||
protected static CountDownLatch ARRIVE;
|
||||
|
||||
protected static CountDownLatch RESUME;
|
||||
|
||||
protected static TableName TABLE_NAME = TableName
|
||||
.valueOf("WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase");
|
||||
|
||||
protected static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
protected static byte[] CQ = Bytes.toBytes("cq");
|
||||
|
||||
private byte[] getBytes(String prefix, int index) {
|
||||
return Bytes.toBytes(String.format("%s-%08d", prefix, index));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
LOG.info("Stop WAL appending...");
|
||||
ARRIVE = new CountDownLatch(1);
|
||||
RESUME = new CountDownLatch(1);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
LOG.info("Put totally 100 rows in batches of 5 with " + Durability.ASYNC_WAL + "...");
|
||||
int batchSize = 5;
|
||||
List<Put> puts = new ArrayList<>(batchSize);
|
||||
for (int i = 1; i <= 100; i++) {
|
||||
Put p = new Put(getBytes("row", i)).addColumn(CF, CQ, getBytes("value", i))
|
||||
.setDurability(Durability.ASYNC_WAL);
|
||||
puts.add(p);
|
||||
if (i % batchSize == 0) {
|
||||
table.put(puts);
|
||||
LOG.info("Wrote batch of {} rows from row {}", batchSize,
|
||||
Bytes.toString(puts.get(0).getRow()));
|
||||
puts.clear();
|
||||
// Wait for few of the minibatches in 1st batch of puts to go through the WAL write.
|
||||
// The WAL write will pause then
|
||||
if (ARRIVE != null) {
|
||||
ARRIVE.await();
|
||||
ARRIVE = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Resume WAL appending...");
|
||||
RESUME.countDown();
|
||||
LOG.info("Put a single row to force a WAL sync...");
|
||||
table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value")));
|
||||
LOG.info("Abort the only region server");
|
||||
UTIL.getMiniHBaseCluster().abortRegionServer(0);
|
||||
LOG.info("Start a new region server");
|
||||
UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000);
|
||||
UTIL.waitTableAvailable(TABLE_NAME);
|
||||
LOG.info("Check if all rows are still valid");
|
||||
for (int i = 1; i <= 100; i++) {
|
||||
Result result = table.get(new Get(getBytes("row", i)));
|
||||
assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ)));
|
||||
}
|
||||
Result result = table.get(new Get(Bytes.toBytes("row")));
|
||||
assertEquals("value", Bytes.toString(result.getValue(CF, CQ)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue