diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 23f28e024b7..cfda30f523a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -448,12 +448,20 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + // confirm non-empty before calling + private static long getLastTxid(Deque queue) { + return queue.peekLast().getTxid(); + } + private void appendAndSync() { final AsyncWriter writer = this.writer; // maybe a sync request is not queued when we issue a sync, so check here to see if we could // finish some. finishSync(); long newHighestProcessedAppendTxid = -1L; + // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single + // threaded, this could save us some cycles + boolean addedToUnackedAppends = false; for (Iterator iter = toWriteAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); boolean appended; @@ -467,10 +475,21 @@ public class AsyncFSWAL extends AbstractFSWAL { if (appended) { // This is possible, when we fail to sync, we will add the unackedAppends back to // toWriteAppends, so here we may get an entry which is already in the unackedAppends. - if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) { + if (addedToUnackedAppends || unackedAppends.isEmpty() || + getLastTxid(unackedAppends) < entry.getTxid()) { unackedAppends.addLast(entry); + addedToUnackedAppends = true; } - if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + // See HBASE-25905, here we need to make sure that, we will always write all the entries in + // unackedAppends out. As the code in the consume method will assume that, the entries in + // unackedAppends have all been sent out so if there is roll request and unackedAppends is + // not empty, we could just return as later there will be a syncCompleted call to clear the + // unackedAppends, or a syncFailed to lead us to another state. + // There could be other ways to fix, such as changing the logic in the consume method, but + // it will break the assumption and then (may) lead to a big refactoring. So here let's use + // this way to fix first, can optimize later. + if (writer.getLength() - fileLengthAtLastSync >= batchSize && + (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) { break; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 3a2ffa7600b..8d46bad44f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -48,6 +48,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); + public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl"; + // Only public so classes back in regionserver.wal can access public interface AsyncWriter extends WALProvider.AsyncWriter { /** @@ -98,7 +100,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { Class channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( - "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); + WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java new file mode 100644 index 00000000000..858fc315664 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + +/** + * Testcase for HBASE-25905 + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncFSWALRollStuck { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(); + + private static Class CHANNEL_CLASS = NioSocketChannel.class; + + private static ScheduledExecutorService EXECUTOR; + + private static BlockingQueue> FUTURES = new ArrayBlockingQueue<>(3); + + private static AtomicInteger SYNC_COUNT = new AtomicInteger(0); + + private static CountDownLatch ARRIVE = new CountDownLatch(1); + + private static CountDownLatch RESUME = new CountDownLatch(1); + + public static final class TestAsyncWriter extends AsyncProtobufLogWriter { + + public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class channelClass) { + super(eventLoopGroup, channelClass); + } + + @Override + public CompletableFuture sync(boolean forceSync) { + int count = SYNC_COUNT.incrementAndGet(); + if (count < 3) { + // we will mark these two futures as failure, to make sure that we have 2 edits in + // unackedAppends, and trigger a WAL roll + CompletableFuture f = new CompletableFuture<>(); + FUTURES.offer(f); + return f; + } else if (count == 3) { + // for this future, we will mark it as succeeded, but before returning from this method, we + // need to request a roll, to enter the special corner case, where we have left some edits + // in unackedAppends but never tries to write them out which leads to a hang + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + } + return super.sync(forceSync); + } else { + return super.sync(forceSync); + } + } + } + + private static TableName TN; + + private static RegionInfo RI; + + private static MultiVersionConcurrencyControl MVCC; + + private static AsyncFSWAL WAL; + + private static ExecutorService ROLL_EXEC; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class); + // set a very small size so we will reach the batch size when writing out a single edit + conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1); + + TN = TableName.valueOf("test"); + RI = RegionInfoBuilder.newBuilder(TN).build(); + MVCC = new MultiVersionConcurrencyControl(); + + EXECUTOR = + Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()); + + Path rootDir = UTIL.getDataTestDir(); + ROLL_EXEC = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); + WALActionsListener listener = new WALActionsListener() { + + @Override + public void logRollRequested(RollRequestReason reason) { + ROLL_EXEC.execute(() -> { + try { + WAL.rollWriter(); + } catch (Exception e) { + LOG.warn("failed to roll writer", e); + } + }); + } + + }; + WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf, + Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS); + WAL.init(); + } + + @AfterClass + public static void tearDown() throws Exception { + EXECUTOR.shutdownNow(); + ROLL_EXEC.shutdownNow(); + Closeables.close(WAL, true); + UTIL.cleanupTestDir(); + } + + @Test + public void testRoll() throws Exception { + byte[] row = Bytes.toBytes("family"); + WALEdit edit = new WALEdit(); + edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row) + .setQualifier(row).setRow(row).setValue(row) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build()); + WALKeyImpl key1 = + new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC); + WAL.appendData(RI, key1, edit); + + WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC); + long txid = WAL.appendData(RI, key2, edit); + + // we need to make sure the two edits have both been added unackedAppends, so we have two syncs + UTIL.waitFor(10000, () -> FUTURES.size() == 2); + FUTURES.poll().completeExceptionally(new IOException("inject error")); + FUTURES.poll().completeExceptionally(new IOException("inject error")); + ARRIVE.await(); + // resume after 1 seconds, to give us enough time to enter the roll state + EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS); + // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside + // waitForSafePoint + WAL.rollWriter(); + // make sure we can finally succeed + WAL.sync(txid); + } +}