HBASE-20425 Do not write the cluster id of the current active cluster when writing remote WAL

This commit is contained in:
huzheng 2018-04-23 17:20:55 +08:00 committed by zhangduo
parent fe339860b5
commit 66cced16dc
1 changed files with 32 additions and 0 deletions

View File

@ -17,9 +17,17 @@
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -49,6 +57,9 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
// peer is disabled so no data have been replicated
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
// Ensure that there's no cluster id in remote log entries.
verifyNoClusterIdInRemoteLog(UTIL2, PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// confirm that peer with state DA will reject replication request.
@ -72,4 +83,25 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
verifyReplicationRequestRejection(UTIL2, true);
write(UTIL2, 200, 300);
}
private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, String peerId)
throws Exception {
FileSystem fs2 = utility.getTestFileSystem();
Path remoteDir =
new Path(utility.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
Assert.assertTrue(files.length > 0);
for (FileStatus file : files) {
try (Reader reader =
WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
Entry entry = reader.next();
Assert.assertTrue(entry != null);
while (entry != null) {
Assert.assertEquals(entry.getKey().getClusterIds().size(), 0);
entry = reader.next();
}
}
}
}
}