From 6d46b8d256bcd63349ea83e4a588b879a122854a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 6 Nov 2018 22:06:04 +0800 Subject: [PATCH] HBASE-21441 NPE if RS restarts between REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN and TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE --- .../replication/regionserver/Replication.java | 11 ++ ...cReplicationNewRSJoinBetweenRefreshes.java | 125 ++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b04f0cbc656..799d9750eda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WALProvider; @@ -137,6 +138,16 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider; peerActionListener = syncWALProvider; syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider); + // for sync replication state change, we need to reload the state twice, you can see the + // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers + // to see if any of them are in the middle of the two refreshes, if so, we need to manually + // repeat the action we have done in the first refresh, otherwise when the second refresh + // comes we will be in trouble, such as NPE. + replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer) + .filter(p -> p.getPeerConfig().isSyncReplication()) + .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE) + .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(), + p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0)); } } this.statsThreadPeriod = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java new file mode 100644 index 00000000000..86ad8c0c3f0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER_VALUE; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-21441. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationNewRSJoinBetweenRefreshes extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationNewRSJoinBetweenRefreshes.class); + + private static boolean HALT; + + private static CountDownLatch ARRIVE; + + private static CountDownLatch RESUME; + + public static final class HaltCP implements RegionServerObserver, RegionServerCoprocessor { + + @Override + public Optional getRegionServerObserver() { + return Optional.of(this); + } + + @Override + public void postExecuteProcedures(ObserverContext ctx) + throws IOException { + synchronized (HaltCP.class) { + if (!HALT) { + return; + } + UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure) + .filter(p -> !p.isFinished()).map(p -> (TransitPeerSyncReplicationStateProcedure) p) + .findFirst().ifPresent(proc -> { + // this is the next state of REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN_VALUE + if (proc.getCurrentStateId() == REOPEN_ALL_REGIONS_IN_PEER_VALUE) { + // tell the main thread to start a new region server + ARRIVE.countDown(); + try { + // wait for the region server to online + RESUME.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + HALT = false; + } + }); + } + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL1.getConfiguration().setClass(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + HaltCP.class, RegionServerObserver.class); + SyncReplicationTestBase.setUp(); + } + + @Test + public void test() throws IOException, InterruptedException { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); + HALT = true; + Thread t = new Thread(() -> { + try { + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + t.start(); + ARRIVE.await(); + UTIL1.getMiniHBaseCluster().startRegionServer(); + RESUME.countDown(); + t.join(); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + UTIL1.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)); + } +}