HBASE-19734 Fix IntegrationTestReplication and related impl changes
Adds (client-side) validation to ReplicationPeerConfigBuilder and javadoc to builder methods in addition to the test fix. Signed-off-by: Guanghao Zhang <zghao@apache.org> Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
b31b386f7f
commit
efa911f56f
|
@ -315,6 +315,8 @@ public class ReplicationPeerConfig {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicationPeerConfig build() {
|
public ReplicationPeerConfig build() {
|
||||||
|
// It would be nice to validate the configuration, but we have to work with "old" data
|
||||||
|
// from ZK which makes it much more difficult.
|
||||||
return new ReplicationPeerConfig(this);
|
return new ReplicationPeerConfig(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,32 +43,103 @@ public interface ReplicationPeerConfigBuilder {
|
||||||
*/
|
*/
|
||||||
ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl);
|
ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets a "raw" configuration property for this replication peer. For experts only.
|
||||||
|
* @param key Configuration property key
|
||||||
|
* @param value Configuration property value
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
|
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds all of the provided "raw" configuration entries to {@code this}.
|
||||||
|
* @param configuration A collection of raw configuration entries
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
default ReplicationPeerConfigBuilder putAllConfiguration(Map<String, String> configuration) {
|
default ReplicationPeerConfigBuilder putAllConfiguration(Map<String, String> configuration) {
|
||||||
configuration.forEach(this::putConfiguration);
|
configuration.forEach(this::putConfiguration);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the serialized peer configuration data
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value);
|
ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets all of the provided serialized peer configuration data.
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData) {
|
default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData) {
|
||||||
peerData.forEach(this::putPeerData);
|
peerData.forEach(this::putPeerData);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets an explicit map of tables and column families in those tables that should be replicated
|
||||||
|
* to the given peer. Use {@link #setReplicateAllUserTables(boolean)} to replicate all tables
|
||||||
|
* to a peer.
|
||||||
|
*
|
||||||
|
* @param tableCFsMap A map from tableName to column family names. An empty collection can be
|
||||||
|
* passed to indicate replicating all column families.
|
||||||
|
* @return {@code this}
|
||||||
|
* @see #setReplicateAllUserTables(boolean)
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder
|
ReplicationPeerConfigBuilder
|
||||||
setTableCFsMap(Map<TableName, List<String>> tableCFsMap);
|
setTableCFsMap(Map<TableName, List<String>> tableCFsMap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets a unique collection of HBase namespaces that should be replicated to this peer.
|
||||||
|
* @param namespaces A set of namespaces to be replicated to this peer.
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces);
|
ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the speed, in bytes per second, for any one RegionServer to replicate data to the peer.
|
||||||
|
* @param bandwidth Bytes per second
|
||||||
|
* @return {@code this}.
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder setBandwidth(long bandwidth);
|
ReplicationPeerConfigBuilder setBandwidth(long bandwidth);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configures HBase to replicate all user tables (not system tables) to the peer. Default is
|
||||||
|
* {@code true}.
|
||||||
|
* @param replicateAllUserTables True if all user tables should be replicated, else false.
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables);
|
ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the mapping of table name to column families which should not be replicated. This
|
||||||
|
* method sets state which is mutually exclusive to {@link #setTableCFsMap(Map)}. Invoking this
|
||||||
|
* method is only relevant when all user tables are being replicated.
|
||||||
|
*
|
||||||
|
* @param tableCFsMap A mapping of table names to column families which should not be
|
||||||
|
* replicated. An empty list of column families implies all families for the table.
|
||||||
|
* @return {@code this}.
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map<TableName, List<String>> tableCFsMap);
|
ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map<TableName, List<String>> tableCFsMap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the collection of namespaces which should not be replicated when all user tables are
|
||||||
|
* configured to be replicated. This method sets state which is mutually exclusive to
|
||||||
|
* {@link #setNamespaces(Set)}. Invoking this method is only relevant when all user tables are
|
||||||
|
* being replicated.
|
||||||
|
*
|
||||||
|
* @param namespaces A set of namespaces whose tables should not be replicated.
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
|
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds the configuration object from the current state of {@code this}.
|
||||||
|
* @return A {@link ReplicationPeerConfig} instance.
|
||||||
|
*/
|
||||||
ReplicationPeerConfig build();
|
ReplicationPeerConfig build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,5 +43,4 @@ public class TestReplicationPeerConfig {
|
||||||
|
|
||||||
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
|
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,16 +31,17 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -222,24 +223,25 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
|
||||||
|
|
||||||
// setup the replication on the source
|
// setup the replication on the source
|
||||||
if (!source.equals(sink)) {
|
if (!source.equals(sink)) {
|
||||||
ReplicationAdmin replicationAdmin = new ReplicationAdmin(source.getConfiguration());
|
try (final Admin admin = source.getConnection().getAdmin()) {
|
||||||
// remove any old replication peers
|
// remove any old replication peers
|
||||||
for (String oldPeer : replicationAdmin.listPeerConfigs().keySet()) {
|
for (ReplicationPeerDescription peer : admin.listReplicationPeers()) {
|
||||||
replicationAdmin.removePeer(oldPeer);
|
admin.removeReplicationPeer(peer.getPeerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the sink to be the target
|
|
||||||
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
|
|
||||||
peerConfig.setClusterKey(sink.toString());
|
|
||||||
|
|
||||||
// set the test table to be the table to replicate
|
// set the test table to be the table to replicate
|
||||||
HashMap<TableName, ArrayList<String>> toReplicate = new HashMap<>();
|
HashMap<TableName, List<String>> toReplicate = new HashMap<>();
|
||||||
toReplicate.put(tableName, new ArrayList<>(0));
|
toReplicate.put(tableName, Collections.emptyList());
|
||||||
|
|
||||||
replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate);
|
// set the sink to be the target
|
||||||
|
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
|
||||||
|
.setClusterKey(sink.toString())
|
||||||
|
.setReplicateAllUserTables(false)
|
||||||
|
.setTableCFsMap(toReplicate).build();
|
||||||
|
|
||||||
replicationAdmin.enableTableRep(tableName);
|
admin.addReplicationPeer("TestPeer", peerConfig);
|
||||||
replicationAdmin.close();
|
admin.enableTableReplication(tableName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ClusterID cluster : clusters) {
|
for (ClusterID cluster : clusters) {
|
||||||
|
|
Loading…
Reference in New Issue