HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
3e14c8d6e6
commit
f5af7611c0
|
@ -462,12 +462,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// confirm non-empty before calling
|
||||||
|
private static long getLastTxid(Deque<FSWALEntry> queue) {
|
||||||
|
return queue.peekLast().getTxid();
|
||||||
|
}
|
||||||
|
|
||||||
private void appendAndSync() {
|
private void appendAndSync() {
|
||||||
final AsyncWriter writer = this.writer;
|
final AsyncWriter writer = this.writer;
|
||||||
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
|
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
|
||||||
// finish some.
|
// finish some.
|
||||||
finishSync(false);
|
finishSync(false);
|
||||||
long newHighestProcessedAppendTxid = -1L;
|
long newHighestProcessedAppendTxid = -1L;
|
||||||
|
// this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
|
||||||
|
// threaded, this could save us some cycles
|
||||||
|
boolean addedToUnackedAppends = false;
|
||||||
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
|
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
|
||||||
FSWALEntry entry = iter.next();
|
FSWALEntry entry = iter.next();
|
||||||
boolean appended;
|
boolean appended;
|
||||||
|
@ -481,10 +489,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
if (appended) {
|
if (appended) {
|
||||||
// This is possible, when we fail to sync, we will add the unackedAppends back to
|
// This is possible, when we fail to sync, we will add the unackedAppends back to
|
||||||
// toWriteAppends, so here we may get an entry which is already in the unackedAppends.
|
// toWriteAppends, so here we may get an entry which is already in the unackedAppends.
|
||||||
if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
|
if (addedToUnackedAppends || unackedAppends.isEmpty() ||
|
||||||
|
getLastTxid(unackedAppends) < entry.getTxid()) {
|
||||||
unackedAppends.addLast(entry);
|
unackedAppends.addLast(entry);
|
||||||
|
addedToUnackedAppends = true;
|
||||||
}
|
}
|
||||||
if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
|
// See HBASE-25905, here we need to make sure that, we will always write all the entries in
|
||||||
|
// unackedAppends out. As the code in the consume method will assume that, the entries in
|
||||||
|
// unackedAppends have all been sent out so if there is roll request and unackedAppends is
|
||||||
|
// not empty, we could just return as later there will be a syncCompleted call to clear the
|
||||||
|
// unackedAppends, or a syncFailed to lead us to another state.
|
||||||
|
// There could be other ways to fix, such as changing the logic in the consume method, but
|
||||||
|
// it will break the assumption and then (may) lead to a big refactoring. So here let's use
|
||||||
|
// this way to fix first, can optimize later.
|
||||||
|
if (writer.getLength() - fileLengthAtLastSync >= batchSize &&
|
||||||
|
(addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
|
||||||
|
|
||||||
|
public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";
|
||||||
|
|
||||||
// Only public so classes back in regionserver.wal can access
|
// Only public so classes back in regionserver.wal can access
|
||||||
public interface AsyncWriter extends WALProvider.AsyncWriter {
|
public interface AsyncWriter extends WALProvider.AsyncWriter {
|
||||||
/**
|
/**
|
||||||
|
@ -106,7 +108,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
Class<? extends Channel> channelClass) throws IOException {
|
Class<? extends Channel> channelClass) throws IOException {
|
||||||
// Configuration already does caching for the Class lookup.
|
// Configuration already does caching for the Class lookup.
|
||||||
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
|
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
|
||||||
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
|
WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
|
||||||
try {
|
try {
|
||||||
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
|
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
|
||||||
.newInstance(eventLoopGroup, channelClass);
|
.newInstance(eventLoopGroup, channelClass);
|
||||||
|
|
|
@ -0,0 +1,205 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell.Type;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testcase for HBASE-25905
|
||||||
|
*/
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestAsyncFSWALRollStuck {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
|
||||||
|
|
||||||
|
private static ScheduledExecutorService EXECUTOR;
|
||||||
|
|
||||||
|
private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3);
|
||||||
|
|
||||||
|
private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private static CountDownLatch ARRIVE = new CountDownLatch(1);
|
||||||
|
|
||||||
|
private static CountDownLatch RESUME = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public static final class TestAsyncWriter extends AsyncProtobufLogWriter {
|
||||||
|
|
||||||
|
public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
|
||||||
|
super(eventLoopGroup, channelClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||||
|
int count = SYNC_COUNT.incrementAndGet();
|
||||||
|
if (count < 3) {
|
||||||
|
// we will mark these two futures as failure, to make sure that we have 2 edits in
|
||||||
|
// unackedAppends, and trigger a WAL roll
|
||||||
|
CompletableFuture<Long> f = new CompletableFuture<>();
|
||||||
|
FUTURES.offer(f);
|
||||||
|
return f;
|
||||||
|
} else if (count == 3) {
|
||||||
|
// for this future, we will mark it as succeeded, but before returning from this method, we
|
||||||
|
// need to request a roll, to enter the special corner case, where we have left some edits
|
||||||
|
// in unackedAppends but never tries to write them out which leads to a hang
|
||||||
|
ARRIVE.countDown();
|
||||||
|
try {
|
||||||
|
RESUME.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
return super.sync(forceSync);
|
||||||
|
} else {
|
||||||
|
return super.sync(forceSync);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TableName TN;
|
||||||
|
|
||||||
|
private static RegionInfo RI;
|
||||||
|
|
||||||
|
private static MultiVersionConcurrencyControl MVCC;
|
||||||
|
|
||||||
|
private static AsyncFSWAL WAL;
|
||||||
|
|
||||||
|
private static ExecutorService ROLL_EXEC;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class);
|
||||||
|
// set a very small size so we will reach the batch size when writing out a single edit
|
||||||
|
conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1);
|
||||||
|
|
||||||
|
TN = TableName.valueOf("test");
|
||||||
|
RI = RegionInfoBuilder.newBuilder(TN).build();
|
||||||
|
MVCC = new MultiVersionConcurrencyControl();
|
||||||
|
|
||||||
|
EXECUTOR =
|
||||||
|
Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
|
||||||
|
|
||||||
|
Path rootDir = UTIL.getDataTestDir();
|
||||||
|
ROLL_EXEC =
|
||||||
|
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
|
||||||
|
WALActionsListener listener = new WALActionsListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void logRollRequested(RollRequestReason reason) {
|
||||||
|
ROLL_EXEC.execute(() -> {
|
||||||
|
try {
|
||||||
|
WAL.rollWriter();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("failed to roll writer", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf,
|
||||||
|
Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS);
|
||||||
|
WAL.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
EXECUTOR.shutdownNow();
|
||||||
|
ROLL_EXEC.shutdownNow();
|
||||||
|
Closeables.close(WAL, true);
|
||||||
|
UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRoll() throws Exception {
|
||||||
|
byte[] row = Bytes.toBytes("family");
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row)
|
||||||
|
.setQualifier(row).setRow(row).setValue(row)
|
||||||
|
.setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build());
|
||||||
|
WALKeyImpl key1 =
|
||||||
|
new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
|
||||||
|
WAL.appendData(RI, key1, edit);
|
||||||
|
|
||||||
|
WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC);
|
||||||
|
long txid = WAL.appendData(RI, key2, edit);
|
||||||
|
|
||||||
|
// we need to make sure the two edits have both been added unackedAppends, so we have two syncs
|
||||||
|
UTIL.waitFor(10000, () -> FUTURES.size() == 2);
|
||||||
|
FUTURES.poll().completeExceptionally(new IOException("inject error"));
|
||||||
|
FUTURES.poll().completeExceptionally(new IOException("inject error"));
|
||||||
|
ARRIVE.await();
|
||||||
|
// resume after 1 seconds, to give us enough time to enter the roll state
|
||||||
|
EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS);
|
||||||
|
// let's roll the wal, before the fix in HBASE-25905, it will hang forever inside
|
||||||
|
// waitForSafePoint
|
||||||
|
WAL.rollWriter();
|
||||||
|
// make sure we can finally succeed
|
||||||
|
WAL.sync(txid);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue