From 05295abd5b5573a3237fe849c95b9e45f22c5560 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 29 May 2018 20:38:20 +0800 Subject: [PATCH] HBASE-20637 Polish the WAL switching when transiting from A to S --- .../hbase/regionserver/wal/AsyncFSWAL.java | 52 +++++++++++++- .../regionserver/wal/DualAsyncFSWAL.java | 71 ++++++++++++++----- .../apache/hadoop/hbase/util/FSHDFSUtils.java | 16 +++-- .../hbase/wal/SyncReplicationWALProvider.java | 2 +- .../replication/DualAsyncFSWALForTest.java | 4 +- .../replication/SyncReplicationTestBase.java | 26 +++++-- .../TestSyncReplicationActive.java | 42 +++++++++-- 7 files changed, 176 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9b4ce9ce278..7f3e30b402c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -52,12 +52,12 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL { // whether to issue a sync in the caller method. } + private void drainNonMarkerEditsAndFailSyncs() { + if (toWriteAppends.isEmpty()) { + return; + } + boolean hasNonMarkerEdits = false; + Iterator iter = toWriteAppends.descendingIterator(); + while (iter.hasNext()) { + FSWALEntry entry = iter.next(); + if (!entry.getEdit().isMetaEdit()) { + hasNonMarkerEdits = true; + break; + } + } + if (hasNonMarkerEdits) { + for (;;) { + iter.remove(); + if (!iter.hasNext()) { + break; + } + iter.next(); + } + unackedAppends.clear(); + // fail the sync futures which are under the txid of the first remaining edit, if none, fail + // all the sync futures. + long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); + IOException error = new IOException("WAL is closing, only marker edit is allowed"); + for (Iterator syncIter = syncFutures.iterator(); syncIter.hasNext();) { + SyncFuture future = syncIter.next(); + if (future.getTxid() < txid) { + future.done(future.getTxid(), error); + syncIter.remove(); + } else { + break; + } + } + } + } + private void consume() { consumeLock.lock(); try { @@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } waitingConsumePayloadsGatingSequence.set(nextCursor); } + if (markerEditOnly()) { + drainNonMarkerEditsAndFailSyncs(); + } appendAndSync(); if (hasConsumerTask.get()) { return; @@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL { return consumerScheduled.compareAndSet(false, true); } + // This is used by sync replication, where we are going to close the wal soon after we reopen all + // the regions. Will be overridden by sub classes. + protected boolean markerEditOnly() { + return false; + } + @Override public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { + if (markerEditOnly() && !edits.isMetaEdit()) { + throw new IOException("WAL is closing, only marker edit is allowed"); + } long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index 3967e78b1cf..bf5b96dfce1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -18,14 +18,19 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @InterfaceAudience.Private public class DualAsyncFSWAL extends AsyncFSWAL { + private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class); + private final FileSystem remoteFs; - private final Path remoteWalDir; + private final Path remoteWALDir; - private volatile boolean skipRemoteWal = false; + private volatile boolean skipRemoteWAL = false; - public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + private volatile boolean markerEditOnly = false; + + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, - eventLoopGroup, channelClass); + eventLoopGroup, channelClass); this.remoteFs = remoteFs; - this.remoteWalDir = remoteWalDir; + this.remoteWALDir = remoteWALDir; } // will be overridden in testcase @@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { AsyncWriter localWriter = super.createWriterInstance(path); - if (skipRemoteWal) { - return localWriter; - } - AsyncWriter remoteWriter; - boolean succ = false; - try { - remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName())); - succ = true; - } finally { - if (!succ) { - closeWriter(localWriter); + // retry forever if we can not create the remote writer to prevent aborting the RS due to log + // rolling error, unless the skipRemoteWal is set to true. + // TODO: since for now we only have one thread doing log rolling, this may block the rolling for + // other wals + Path remoteWAL = new Path(remoteWALDir, path.getName()); + for (int retry = 0;; retry++) { + if (skipRemoteWAL) { + return localWriter; } + AsyncWriter remoteWriter; + try { + remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + } catch (IOException e) { + LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException ie) { + // restore the interrupt state + Thread.currentThread().interrupt(); + Closeables.close(localWriter, true); + throw (IOException) new InterruptedIOException().initCause(ie); + } + continue; + } + return createCombinedAsyncWriter(localWriter, remoteWriter); } - return createCombinedAsyncWriter(localWriter, remoteWriter); + } + + @Override + protected boolean markerEditOnly() { + return markerEditOnly; } // Allow temporarily skipping the creation of remote writer. When failing to write to the remote @@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL { // need to write a close marker when closing a region, and if it fails, the whole rs will abort. // So here we need to skip the creation of remote writer and make it possible to write the region // close marker. - public void skipRemoteWal() { - this.skipRemoteWal = true; + // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing + // any pending wal entries as they will be discarded. The remote cluster will replicated the + // correct data back later. We still need to allow writing marker edits such as close region event + // to allow closing a region. + public void skipRemoteWAL(boolean markerEditOnly) { + if (markerEditOnly) { + this.markerEditOnly = true; + } + this.skipRemoteWAL = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java index 301d158177e..a49ee0227a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java @@ -28,9 +28,9 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; @@ -155,12 +155,16 @@ public class FSHDFSUtils extends FSUtils { * Recover the lease from HDFS, retrying multiple times. */ @Override - public void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf, CancelableProgressable reporter) - throws IOException { + public void recoverFileLease(FileSystem fs, Path p, Configuration conf, + CancelableProgressable reporter) throws IOException { + if (fs instanceof FilterFileSystem) { + fs = ((FilterFileSystem) fs).getRawFileSystem(); + } // lease recovery not needed for local file system case. - if (!(fs instanceof DistributedFileSystem)) return; - recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter); + if (!(fs instanceof DistributedFileSystem)) { + return; + } + recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 82f8a89ca4e..b9fffcff601 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen try { Optional opt = peerId2WAL.get(peerId); if (opt != null) { - opt.ifPresent(DualAsyncFSWAL::skipRemoteWal); + opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY)); } else { // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more. peerId2WAL.put(peerId, Optional.empty()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index fb3daf2bd10..62000b4cd72 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL { } } - public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, + public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { - super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists, + super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, eventLoopGroup, channelClass); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 095be9007b6..a20edd3b264 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; @@ -127,10 +129,26 @@ public class SyncReplicationTestBase { .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); } + private static void shutdown(HBaseTestingUtility util) throws Exception { + if (util.getHBaseCluster() == null) { + return; + } + Admin admin = util.getAdmin(); + if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) { + if (admin + .getReplicationPeerSyncReplicationState(PEER_ID) != SyncReplicationState.DOWNGRADE_ACTIVE) { + admin.transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } + admin.removeReplicationPeer(PEER_ID); + } + util.shutdownMiniCluster(); + } + @AfterClass public static void tearDown() throws Exception { - UTIL1.shutdownMiniCluster(); - UTIL2.shutdownMiniCluster(); + shutdown(UTIL1); + shutdown(UTIL2); ZK_UTIL.shutdownMiniZKCluster(); } @@ -207,7 +225,7 @@ public class SyncReplicationTestBase { protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility) throws Exception { ReplicationPeerStorage rps = ReplicationStorageFactory - .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration()); + .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration()); try { rps.getPeerSyncReplicationState(peerId); fail("Should throw exception when get the sync replication state of a removed peer."); @@ -233,7 +251,7 @@ public class SyncReplicationTestBase { Entry[] entries = new Entry[10]; for (int i = 0; i < entries.length; i++) { entries[i] = - new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); + new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index fce0cdff0dd..42adab60b5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -17,13 +17,28 @@ */ package org.apache.hadoop.hbase.replication; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -66,8 +81,27 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { // confirm that the data is there after we convert the peer to DA verify(UTIL2, 0, 100); - UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, - SyncReplicationState.STANDBY); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build(); + CompletableFuture future = + table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000))); + Thread.sleep(2000); + // should hang on rolling + assertFalse(future.isDone()); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + try { + future.get(); + fail("should fail because of the wal is closing"); + } catch (ExecutionException e) { + // expected + assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed")); + } + } + // confirm that the data has not been persisted + HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty()); UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.ACTIVE); @@ -89,8 +123,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId)); Assert.assertTrue(files.length > 0); for (FileStatus file : files) { - try (Reader reader = - WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { + try ( + Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { Entry entry = reader.next(); Assert.assertTrue(entry != null); while (entry != null) {