HBASE-20783 Addendum fix broken TestSyncReplicationStandBy
This commit is contained in:
parent
f1806a11e6
commit
a84cdbd579
|
@ -68,10 +68,7 @@ public enum SyncReplicationState {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException {
|
public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException {
|
||||||
if (bytes == null) {
|
|
||||||
return SyncReplicationState.NONE;
|
|
||||||
}
|
|
||||||
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
|
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
|
||||||
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
|
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.replication;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({ ClientTests.class, SmallTests.class })
|
|
||||||
public class TestSyncReplicationState {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestSyncReplicationState.class);
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSyncReplicationStateParseFrom() throws Exception {
|
|
||||||
Assert.assertEquals(SyncReplicationState.parseFrom(null), SyncReplicationState.NONE);
|
|
||||||
for (SyncReplicationState state : SyncReplicationState.values()) {
|
|
||||||
byte[] data = SyncReplicationState.toByteArray(state);
|
|
||||||
SyncReplicationState actualState = SyncReplicationState.parseFrom(data);
|
|
||||||
Assert.assertEquals(state, actualState);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -93,7 +93,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
||||||
return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE);
|
return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getNewSyncReplicationStateNode(String peerId) {
|
@VisibleForTesting
|
||||||
|
public String getNewSyncReplicationStateNode(String peerId) {
|
||||||
return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE);
|
return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,6 +222,16 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
||||||
throws ReplicationException {
|
throws ReplicationException {
|
||||||
try {
|
try {
|
||||||
byte[] data = ZKUtil.getData(zookeeper, path);
|
byte[] data = ZKUtil.getData(zookeeper, path);
|
||||||
|
if (data == null || data.length == 0) {
|
||||||
|
if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) {
|
||||||
|
// should be a peer from previous version, set the sync replication state for it.
|
||||||
|
ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES);
|
||||||
|
return SyncReplicationState.NONE;
|
||||||
|
} else {
|
||||||
|
throw new ReplicationException(
|
||||||
|
"Replication peer sync state shouldn't be empty, peerId=" + peerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
return SyncReplicationState.parseFrom(data);
|
return SyncReplicationState.parseFrom(data);
|
||||||
} catch (KeeperException | InterruptedException | IOException e) {
|
} catch (KeeperException | InterruptedException | IOException e) {
|
||||||
throw new ReplicationException(
|
throw new ReplicationException(
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList;
|
||||||
import static java.util.stream.Collectors.toSet;
|
import static java.util.stream.Collectors.toSet;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -38,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -177,8 +180,39 @@ public class TestZKReplicationPeerStorage {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
STORAGE.getPeerConfig(toRemove);
|
STORAGE.getPeerConfig(toRemove);
|
||||||
fail("Should throw a ReplicationException when get peer config of a peerId");
|
fail("Should throw a ReplicationException when getting peer config of a removed peer");
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoSyncReplicationState()
|
||||||
|
throws ReplicationException, KeeperException, IOException {
|
||||||
|
// This could happen for a peer created before we introduce sync replication.
|
||||||
|
String peerId = "testNoSyncReplicationState";
|
||||||
|
try {
|
||||||
|
STORAGE.getPeerSyncReplicationState(peerId);
|
||||||
|
fail("Should throw a ReplicationException when getting state of inexist peer");
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
STORAGE.getPeerNewSyncReplicationState(peerId);
|
||||||
|
fail("Should throw a ReplicationException when getting state of inexist peer");
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
|
||||||
|
// delete the sync replication state node to simulate
|
||||||
|
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId));
|
||||||
|
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId));
|
||||||
|
// should not throw exception as the peer exists
|
||||||
|
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId));
|
||||||
|
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId));
|
||||||
|
// make sure we create the node for the old format peer
|
||||||
|
assertNotEquals(-1,
|
||||||
|
ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)));
|
||||||
|
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
|
||||||
|
STORAGE.getNewSyncReplicationStateNode(peerId)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
@ -229,18 +229,18 @@ public class SyncReplicationTestBase {
|
||||||
try {
|
try {
|
||||||
rps.getPeerSyncReplicationState(peerId);
|
rps.getPeerSyncReplicationState(peerId);
|
||||||
fail("Should throw exception when get the sync replication state of a removed peer.");
|
fail("Should throw exception when get the sync replication state of a removed peer.");
|
||||||
} catch (NullPointerException e) {
|
} catch (ReplicationException e) {
|
||||||
// ignore.
|
// ignore.
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
rps.getPeerNewSyncReplicationState(peerId);
|
rps.getPeerNewSyncReplicationState(peerId);
|
||||||
fail("Should throw exception when get the new sync replication state of a removed peer");
|
fail("Should throw exception when get the new sync replication state of a removed peer");
|
||||||
} catch (NullPointerException e) {
|
} catch (ReplicationException e) {
|
||||||
// ignore.
|
// ignore.
|
||||||
}
|
}
|
||||||
try (FileSystem fs = utility.getTestFileSystem()) {
|
try (FileSystem fs = utility.getTestFileSystem()) {
|
||||||
Assert.assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
|
assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
|
||||||
Assert.assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
|
assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,10 +260,10 @@ public class SyncReplicationTestBase {
|
||||||
try {
|
try {
|
||||||
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
|
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
|
||||||
entries, null, null, null);
|
entries, null, null, null);
|
||||||
Assert.fail("Should throw IOException when sync-replication state is in A or DA");
|
fail("Should throw IOException when sync-replication state is in A or DA");
|
||||||
} catch (DoNotRetryIOException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
|
assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
|
||||||
Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString()));
|
assertTrue(e.getMessage().contains(TABLE_NAME.toString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue