diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index b53c770cf37..9a4c11ad404 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -93,11 +93,13 @@ public abstract class ServerCall implements RpcCa private long exceptionSize = 0; private final boolean retryImmediatelySupported; - // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the - // second bit is for WAL reference. We can only call release if both of them are zero. The reason - // why we can not use a general reference counting is that, we may call cleanup multiple times in - // the current implementation. We should fix this in the future. - private final AtomicInteger reference = new AtomicInteger(0b01); + // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and + // the rest of the bits are for WAL reference count. We can only call release if all of them are + // zero. The reason why we can not use a general reference counting is that, we may call cleanup + // multiple times in the current implementation. We should fix this in the future. + // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement + // from WAL side on release + private final AtomicInteger reference = new AtomicInteger(0x80000000); @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", justification = "Can't figure why this complaint is happening... see below") @@ -149,13 +151,14 @@ public abstract class ServerCall implements RpcCa cleanup(); } - private void release(int mask) { + @Override + public void cleanup() { for (;;) { int ref = reference.get(); - if ((ref & mask) == 0) { + if ((ref & 0x80000000) == 0) { return; } - int nextRef = ref & (~mask); + int nextRef = ref & 0x7fffffff; if (reference.compareAndSet(ref, nextRef)) { if (nextRef == 0) { if (this.reqCleanup != null) { @@ -167,23 +170,19 @@ public abstract class ServerCall implements RpcCa } } - @Override - public void cleanup() { - release(0b01); - } - public void retainByWAL() { - for (;;) { - int ref = reference.get(); - int nextRef = ref | 0b10; - if (reference.compareAndSet(ref, nextRef)) { - return; - } - } + reference.incrementAndGet(); } public void releaseByWAL() { - release(0b10); + // Here this method of decrementAndGet for releasing WAL reference count will work in both + // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive + // value respectively in these 2 cases, but the logic will work the same way + if (reference.decrementAndGet() == 0) { + if (this.reqCleanup != null) { + this.reqCleanup.run(); + } + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java new file mode 100644 index 00000000000..407d5a2fcc4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.ipc.SimpleRpcServer; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer + extends WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.class); + + public static final class PauseWAL extends FSHLog { + + private int testTableWalAppendsCount = 0; + + public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix) throws IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + @Override + protected void atHeadOfRingBufferEventHandlerAppend() { + // Let the 1st Append go through. The write thread will wait for this to go through before + // calling further put() + if (ARRIVE != null) { // Means appends as part of puts in testcase + // Sleep for a second so that RS handler thread put all the mini batch WAL appends to ring + // buffer. + if (testTableWalAppendsCount == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + // Let the first minibatch write go through. When 2nd one comes, notify the waiting test + // case for doing further batch puts and make this WAL append thread to pause + if (testTableWalAppendsCount == 1) { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + } + } + testTableWalAppendsCount++; + } + } + } + + public static final class PauseWALProvider extends AbstractFSWALProvider { + + @Override + protected PauseWAL createWAL() throws IOException { + return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + } + + @Override + protected void doInit(Configuration conf) throws IOException { + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, + WALProvider.class); + UTIL.getConfiguration().setInt(HRegion.HBASE_REGIONSERVER_MINIBATCH_SIZE, 1); + UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, + SimpleRpcServer.class.getName()); + UTIL.getConfiguration().setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 1); + UTIL.getConfiguration().setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024); + UTIL.getConfiguration().setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 500); + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, CF); + UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java new file mode 100644 index 00000000000..50da7a5e37d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase { + + private static final Logger LOG = LoggerFactory + .getLogger(WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static CountDownLatch ARRIVE; + + protected static CountDownLatch RESUME; + + protected static TableName TABLE_NAME = TableName + .valueOf("WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase"); + + protected static byte[] CF = Bytes.toBytes("cf"); + + protected static byte[] CQ = Bytes.toBytes("cq"); + + private byte[] getBytes(String prefix, int index) { + return Bytes.toBytes(String.format("%s-%08d", prefix, index)); + } + + @Test + public void test() throws Exception { + LOG.info("Stop WAL appending..."); + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + LOG.info("Put totally 100 rows in batches of 5 with " + Durability.ASYNC_WAL + "..."); + int batchSize = 5; + List puts = new ArrayList<>(batchSize); + for (int i = 1; i <= 100; i++) { + Put p = new Put(getBytes("row", i)).addColumn(CF, CQ, getBytes("value", i)) + .setDurability(Durability.ASYNC_WAL); + puts.add(p); + if (i % batchSize == 0) { + table.put(puts); + LOG.info("Wrote batch of {} rows from row {}", batchSize, + Bytes.toString(puts.get(0).getRow())); + puts.clear(); + // Wait for few of the minibatches in 1st batch of puts to go through the WAL write. + // The WAL write will pause then + if (ARRIVE != null) { + ARRIVE.await(); + ARRIVE = null; + } + } + } + LOG.info("Resume WAL appending..."); + RESUME.countDown(); + LOG.info("Put a single row to force a WAL sync..."); + table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value"))); + LOG.info("Abort the only region server"); + UTIL.getMiniHBaseCluster().abortRegionServer(0); + LOG.info("Start a new region server"); + UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000); + UTIL.waitTableAvailable(TABLE_NAME); + LOG.info("Check if all rows are still valid"); + for (int i = 1; i <= 100; i++) { + Result result = table.get(new Get(getBytes("row", i))); + assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ))); + } + Result result = table.get(new Get(Bytes.toBytes("row"))); + assertEquals("value", Bytes.toString(result.getValue(CF, CQ))); + } + } +}