HBASE-20637 Polish the WAL switching when transiting from A to S
This commit is contained in:
parent
f67763ffa0
commit
05295abd5b
|
@ -52,12 +52,12 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
|
@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
// whether to issue a sync in the caller method.
|
||||
}
|
||||
|
||||
private void drainNonMarkerEditsAndFailSyncs() {
|
||||
if (toWriteAppends.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
boolean hasNonMarkerEdits = false;
|
||||
Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
|
||||
while (iter.hasNext()) {
|
||||
FSWALEntry entry = iter.next();
|
||||
if (!entry.getEdit().isMetaEdit()) {
|
||||
hasNonMarkerEdits = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (hasNonMarkerEdits) {
|
||||
for (;;) {
|
||||
iter.remove();
|
||||
if (!iter.hasNext()) {
|
||||
break;
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
unackedAppends.clear();
|
||||
// fail the sync futures which are under the txid of the first remaining edit, if none, fail
|
||||
// all the sync futures.
|
||||
long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
|
||||
IOException error = new IOException("WAL is closing, only marker edit is allowed");
|
||||
for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
|
||||
SyncFuture future = syncIter.next();
|
||||
if (future.getTxid() < txid) {
|
||||
future.done(future.getTxid(), error);
|
||||
syncIter.remove();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void consume() {
|
||||
consumeLock.lock();
|
||||
try {
|
||||
|
@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
waitingConsumePayloadsGatingSequence.set(nextCursor);
|
||||
}
|
||||
if (markerEditOnly()) {
|
||||
drainNonMarkerEditsAndFailSyncs();
|
||||
}
|
||||
appendAndSync();
|
||||
if (hasConsumerTask.get()) {
|
||||
return;
|
||||
|
@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
return consumerScheduled.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
// This is used by sync replication, where we are going to close the wal soon after we reopen all
|
||||
// the regions. Will be overridden by sub classes.
|
||||
protected boolean markerEditOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||
throws IOException {
|
||||
if (markerEditOnly() && !edits.isMetaEdit()) {
|
||||
throw new IOException("WAL is closing, only marker edit is allowed");
|
||||
}
|
||||
long txid =
|
||||
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
|
||||
if (shouldScheduleConsumer()) {
|
||||
|
|
|
@ -18,14 +18,19 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
|
||||
|
@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
|||
@InterfaceAudience.Private
|
||||
public class DualAsyncFSWAL extends AsyncFSWAL {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class);
|
||||
|
||||
private final FileSystem remoteFs;
|
||||
|
||||
private final Path remoteWalDir;
|
||||
private final Path remoteWALDir;
|
||||
|
||||
private volatile boolean skipRemoteWal = false;
|
||||
private volatile boolean skipRemoteWAL = false;
|
||||
|
||||
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
|
||||
private volatile boolean markerEditOnly = false;
|
||||
|
||||
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
|
||||
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
|
||||
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
|
||||
eventLoopGroup, channelClass);
|
||||
eventLoopGroup, channelClass);
|
||||
this.remoteFs = remoteFs;
|
||||
this.remoteWalDir = remoteWalDir;
|
||||
this.remoteWALDir = remoteWALDir;
|
||||
}
|
||||
|
||||
// will be overridden in testcase
|
||||
|
@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
|
|||
@Override
|
||||
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
||||
AsyncWriter localWriter = super.createWriterInstance(path);
|
||||
if (skipRemoteWal) {
|
||||
return localWriter;
|
||||
}
|
||||
AsyncWriter remoteWriter;
|
||||
boolean succ = false;
|
||||
try {
|
||||
remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName()));
|
||||
succ = true;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
closeWriter(localWriter);
|
||||
// retry forever if we can not create the remote writer to prevent aborting the RS due to log
|
||||
// rolling error, unless the skipRemoteWal is set to true.
|
||||
// TODO: since for now we only have one thread doing log rolling, this may block the rolling for
|
||||
// other wals
|
||||
Path remoteWAL = new Path(remoteWALDir, path.getName());
|
||||
for (int retry = 0;; retry++) {
|
||||
if (skipRemoteWAL) {
|
||||
return localWriter;
|
||||
}
|
||||
AsyncWriter remoteWriter;
|
||||
try {
|
||||
remoteWriter = createAsyncWriter(remoteFs, remoteWAL);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
|
||||
} catch (InterruptedException ie) {
|
||||
// restore the interrupt state
|
||||
Thread.currentThread().interrupt();
|
||||
Closeables.close(localWriter, true);
|
||||
throw (IOException) new InterruptedIOException().initCause(ie);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return createCombinedAsyncWriter(localWriter, remoteWriter);
|
||||
}
|
||||
return createCombinedAsyncWriter(localWriter, remoteWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean markerEditOnly() {
|
||||
return markerEditOnly;
|
||||
}
|
||||
|
||||
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
|
||||
|
@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
|
|||
// need to write a close marker when closing a region, and if it fails, the whole rs will abort.
|
||||
// So here we need to skip the creation of remote writer and make it possible to write the region
|
||||
// close marker.
|
||||
public void skipRemoteWal() {
|
||||
this.skipRemoteWal = true;
|
||||
// Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
|
||||
// any pending wal entries as they will be discarded. The remote cluster will replicated the
|
||||
// correct data back later. We still need to allow writing marker edits such as close region event
|
||||
// to allow closing a region.
|
||||
public void skipRemoteWAL(boolean markerEditOnly) {
|
||||
if (markerEditOnly) {
|
||||
this.markerEditOnly = true;
|
||||
}
|
||||
this.skipRemoteWAL = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,9 +28,9 @@ import java.util.Collection;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
|
@ -155,12 +155,16 @@ public class FSHDFSUtils extends FSUtils {
|
|||
* Recover the lease from HDFS, retrying multiple times.
|
||||
*/
|
||||
@Override
|
||||
public void recoverFileLease(final FileSystem fs, final Path p,
|
||||
Configuration conf, CancelableProgressable reporter)
|
||||
throws IOException {
|
||||
public void recoverFileLease(FileSystem fs, Path p, Configuration conf,
|
||||
CancelableProgressable reporter) throws IOException {
|
||||
if (fs instanceof FilterFileSystem) {
|
||||
fs = ((FilterFileSystem) fs).getRawFileSystem();
|
||||
}
|
||||
// lease recovery not needed for local file system case.
|
||||
if (!(fs instanceof DistributedFileSystem)) return;
|
||||
recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
return;
|
||||
}
|
||||
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
|
|||
try {
|
||||
Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
|
||||
if (opt != null) {
|
||||
opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
|
||||
opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
|
||||
} else {
|
||||
// add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
|
||||
peerId2WAL.put(peerId, Optional.empty());
|
||||
|
|
|
@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL {
|
|||
}
|
||||
}
|
||||
|
||||
public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
|
||||
public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
|
||||
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
|
||||
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
||||
super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
|
||||
super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists,
|
||||
prefix, suffix, eventLoopGroup, channelClass);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -127,10 +129,26 @@ public class SyncReplicationTestBase {
|
|||
.setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
|
||||
}
|
||||
|
||||
private static void shutdown(HBaseTestingUtility util) throws Exception {
|
||||
if (util.getHBaseCluster() == null) {
|
||||
return;
|
||||
}
|
||||
Admin admin = util.getAdmin();
|
||||
if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
|
||||
if (admin
|
||||
.getReplicationPeerSyncReplicationState(PEER_ID) != SyncReplicationState.DOWNGRADE_ACTIVE) {
|
||||
admin.transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
}
|
||||
admin.removeReplicationPeer(PEER_ID);
|
||||
}
|
||||
util.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL1.shutdownMiniCluster();
|
||||
UTIL2.shutdownMiniCluster();
|
||||
shutdown(UTIL1);
|
||||
shutdown(UTIL2);
|
||||
ZK_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
|
@ -207,7 +225,7 @@ public class SyncReplicationTestBase {
|
|||
protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
|
||||
throws Exception {
|
||||
ReplicationPeerStorage rps = ReplicationStorageFactory
|
||||
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
|
||||
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
|
||||
try {
|
||||
rps.getPeerSyncReplicationState(peerId);
|
||||
fail("Should throw exception when get the sync replication state of a removed peer.");
|
||||
|
@ -233,7 +251,7 @@ public class SyncReplicationTestBase {
|
|||
Entry[] entries = new Entry[10];
|
||||
for (int i = 0; i < entries.length; i++) {
|
||||
entries[i] =
|
||||
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
|
||||
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
|
||||
}
|
||||
if (!expectedRejection) {
|
||||
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
|
||||
|
|
|
@ -17,13 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -66,8 +81,27 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
|
|||
// confirm that the data is there after we convert the peer to DA
|
||||
verify(UTIL2, 0, 100);
|
||||
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
try (AsyncConnection conn =
|
||||
ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
|
||||
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
|
||||
CompletableFuture<Void> future =
|
||||
table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
|
||||
Thread.sleep(2000);
|
||||
// should hang on rolling
|
||||
assertFalse(future.isDone());
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
try {
|
||||
future.get();
|
||||
fail("should fail because of the wal is closing");
|
||||
} catch (ExecutionException e) {
|
||||
// expected
|
||||
assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
|
||||
}
|
||||
}
|
||||
// confirm that the data has not been persisted
|
||||
HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
|
||||
assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.ACTIVE);
|
||||
|
||||
|
@ -89,8 +123,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
|
|||
FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
|
||||
Assert.assertTrue(files.length > 0);
|
||||
for (FileStatus file : files) {
|
||||
try (Reader reader =
|
||||
WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
|
||||
try (
|
||||
Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
|
||||
Entry entry = reader.next();
|
||||
Assert.assertTrue(entry != null);
|
||||
while (entry != null) {
|
||||
|
|
Loading…
Reference in New Issue