HBASE-19935 Only allow table replication for sync replication for now

This commit is contained in:
Guanghao Zhang 2018-02-06 16:00:59 +08:00 committed by zhangduo
parent 1481bd9481
commit 00e54aae24
5 changed files with 106 additions and 30 deletions

View File

@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@ -220,6 +222,13 @@ public class ReplicationPeerConfig {
return this.remoteWALDir;
}
/**
* Use remote wal dir to decide whether a peer is sync replication peer
*/
public boolean isSyncReplication() {
return !StringUtils.isBlank(this.remoteWALDir);
}
public static ReplicationPeerConfigBuilder newBuilder() {
return new ReplicationPeerConfigBuilderImpl();
}

View File

@ -170,7 +170,7 @@ public class ReplicationPeerManager {
" does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
}
if (oldPeerConfig.getRemoteWALDir() != null) {
if (oldPeerConfig.isSyncReplication()) {
if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
throw new DoNotRetryIOException(
"Changing the replicated namespace/table config on a synchronous replication " +
@ -199,8 +199,8 @@ public class ReplicationPeerManager {
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE
: SyncReplicationState.DOWNGRADE_ACTIVE;
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
: SyncReplicationState.NONE;
peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
peers.put(peerId,
new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
@ -324,9 +324,37 @@ public class ReplicationPeerManager {
peerConfig.getTableCFsMap());
}
if (peerConfig.isSyncReplication()) {
checkPeerConfigForSyncReplication(peerConfig);
}
checkConfiguredWALEntryFilters(peerConfig);
}
private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
// This is used to reduce the difficulty for implementing the sync replication state transition
// as we need to reopen all the related regions.
// TODO: Add namespace, replicat_all flag back
if (peerConfig.replicateAllUserTables()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
}
for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
if (cfs != null && !cfs.isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
}
}
/**
* Set a namespace in the peer config means that all tables in this namespace will be replicated
* to the peer cluster.

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -910,6 +911,8 @@ public class TestReplicationAdmin {
@Test
public void testPeerRemoteWALDir() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
String rootDir = "hdfs://srv1:9999/hbase";
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
@ -929,8 +932,47 @@ public class TestReplicationAdmin {
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
}
builder.setReplicateAllUserTables(false);
try {
Set<String> namespaces = new HashSet<String>();
namespaces.add("ns1");
builder.setNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
}
builder.setNamespaces(null);
try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication, and tables can't be empty");
} catch (Exception e) {
// OK
}
Map<TableName, List<String>> tableCfs = new HashMap<>();
try {
tableCfs.put(tableName, Arrays.asList("cf1"));
builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
}
tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
assertEquals(rootDir, rpc.getRemoteWALDir());
@ -952,34 +994,12 @@ public class TestReplicationAdmin {
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
builder.setReplicateAllUserTables(false);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new ArrayList<>());
builder.setTableCFsMap(tableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
Set<String> namespaces = new HashSet<>();
namespaces.add("ns1");
builder.setExcludeNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>());
builder.setExcludeTableCFsMap(tableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
"Change replicated table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
@ -987,8 +1007,11 @@ public class TestReplicationAdmin {
@Test
public void testTransitSyncReplicationPeerState() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
builder.setReplicateAllUserTables(false);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
assertEquals(SyncReplicationState.NONE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
@ -1005,6 +1028,10 @@ public class TestReplicationAdmin {
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir);
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));

View File

@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -51,6 +53,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static EventLoopGroup EVENT_LOOP_GROUP;

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
@ -41,12 +42,17 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSyncReplicationWALProvider {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";