diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java index 448603cc703..de9576caebd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -68,10 +68,7 @@ public enum SyncReplicationState { } public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException { - if (bytes == null) { - return SyncReplicationState.NONE; - } return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState - .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); + .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationState.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationState.java deleted file mode 100644 index 692125218e8..00000000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationState.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ClientTests.class, SmallTests.class }) -public class TestSyncReplicationState { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplicationState.class); - - @Test - public void testSyncReplicationStateParseFrom() throws Exception { - Assert.assertEquals(SyncReplicationState.parseFrom(null), SyncReplicationState.NONE); - for (SyncReplicationState state : SyncReplicationState.values()) { - byte[] data = SyncReplicationState.toByteArray(state); - SyncReplicationState actualState = SyncReplicationState.parseFrom(data); - Assert.assertEquals(state, actualState); - } - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index a2cdfdf822a..7a943c4035c 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -93,7 +93,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE); } - private String getNewSyncReplicationStateNode(String peerId) { + @VisibleForTesting + public String getNewSyncReplicationStateNode(String peerId) { return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE); } @@ -221,6 +222,16 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase throws ReplicationException { try { byte[] data = ZKUtil.getData(zookeeper, path); + if (data == null || data.length == 0) { + if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) { + // should be a peer from previous version, set the sync replication state for it. + ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES); + return SyncReplicationState.NONE; + } else { + throw new ReplicationException( + "Replication peer sync state shouldn't be empty, peerId=" + peerId); + } + } return SyncReplicationState.parseFrom(data); } catch (KeeperException | InterruptedException | IOException e) { throw new ReplicationException( diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 12586956c79..0e7cd74048c 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -38,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -177,8 +180,39 @@ public class TestZKReplicationPeerStorage { try { STORAGE.getPeerConfig(toRemove); - fail("Should throw a ReplicationException when get peer config of a peerId"); + fail("Should throw a ReplicationException when getting peer config of a removed peer"); } catch (ReplicationException e) { } } + + @Test + public void testNoSyncReplicationState() + throws ReplicationException, KeeperException, IOException { + // This could happen for a peer created before we introduce sync replication. + String peerId = "testNoSyncReplicationState"; + try { + STORAGE.getPeerSyncReplicationState(peerId); + fail("Should throw a ReplicationException when getting state of inexist peer"); + } catch (ReplicationException e) { + // expected + } + try { + STORAGE.getPeerNewSyncReplicationState(peerId); + fail("Should throw a ReplicationException when getting state of inexist peer"); + } catch (ReplicationException e) { + // expected + } + STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE); + // delete the sync replication state node to simulate + ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)); + ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId)); + // should not throw exception as the peer exists + assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId)); + assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId)); + // make sure we create the node for the old format peer + assertNotEquals(-1, + ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId))); + assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), + STORAGE.getNewSyncReplicationStateNode(peerId))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index f765139099a..e251fb4e4e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -229,18 +229,18 @@ public class SyncReplicationTestBase { try { rps.getPeerSyncReplicationState(peerId); fail("Should throw exception when get the sync replication state of a removed peer."); - } catch (NullPointerException e) { + } catch (ReplicationException e) { // ignore. } try { rps.getPeerNewSyncReplicationState(peerId); fail("Should throw exception when get the new sync replication state of a removed peer"); - } catch (NullPointerException e) { + } catch (ReplicationException e) { // ignore. } try (FileSystem fs = utility.getTestFileSystem()) { - Assert.assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); - Assert.assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); + assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); + assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); } } @@ -260,10 +260,10 @@ public class SyncReplicationTestBase { try { ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), entries, null, null, null); - Assert.fail("Should throw IOException when sync-replication state is in A or DA"); + fail("Should throw IOException when sync-replication state is in A or DA"); } catch (DoNotRetryIOException e) { - Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); - Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString())); + assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); + assertTrue(e.getMessage().contains(TABLE_NAME.toString())); } } }