HBASE-19621 Revisit the methods in ReplicationPeerConfigBuilder

This commit is contained in:
Guanghao Zhang 2017-12-25 14:29:39 +08:00
parent 65159dc256
commit 2abf7b508c
5 changed files with 38 additions and 40 deletions

View File

@ -26,7 +26,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -277,17 +276,13 @@ public final class ReplicationPeerConfigUtil {
builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
}
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
peerData.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
}
builder.setPeerData(peerData);
Map<String, String> configuration = new HashMap<>();
for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
configuration.put(pair.getName(), pair.getValue());
builder.putConfiguration(pair.getName(), pair.getValue());
}
builder.setConfiguration(configuration);
Map<TableName, List<String>> tableCFsMap = convert2Map(
peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));

View File

@ -219,7 +219,7 @@ public class ReplicationPeerConfig {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
.setPeerData(peerConfig.getPeerData()).setConfiguration(peerConfig.getConfiguration())
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
@ -264,14 +264,14 @@ public class ReplicationPeerConfig {
}
@Override
public ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData) {
this.peerData = peerData;
public ReplicationPeerConfigBuilder putConfiguration(String key, String value) {
this.configuration.put(key, value);
return this;
}
@Override
public ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
this.peerData.put(key, value);
return this;
}

View File

@ -43,9 +43,19 @@ public interface ReplicationPeerConfigBuilder {
*/
ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl);
ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData);
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration);
default ReplicationPeerConfigBuilder putAllConfiguration(Map<String, String> configuration) {
configuration.forEach(this::putConfiguration);
return this;
}
ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value);
default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData) {
peerData.forEach(this::putPeerData);
return this;
}
ReplicationPeerConfigBuilder
setTableCFsMap(Map<TableName, List<String>> tableCFsMap);

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -363,17 +361,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
+ existingConfig.getReplicationEndpointImpl()
+ "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
}
//Update existingConfig's peer config and peer data with the new values, but don't touch config
// Update existingConfig's peer config and peer data with the new values, but don't touch config
// or data that weren't explicitly changed
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(newConfig);
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
existingConfig.getPeerData().forEach(peerData::put);
newConfig.getPeerData().forEach(peerData::put);
builder.setPeerData(peerData);
Map<String, String> configuration = new HashMap<>();
existingConfig.getConfiguration().forEach(configuration::put);
newConfig.getConfiguration().forEach(configuration::put);
builder.setConfiguration(configuration);
builder.putAllConfiguration(existingConfig.getConfiguration());
builder.putAllPeerData(existingConfig.getPeerData());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@ -121,39 +122,39 @@ public class TestReplicationAdmin {
*/
@Test
public void testAddRemovePeer() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
rpc2.setClusterKey(KEY_SECOND);
// Add a valid peer
admin.addPeer(ID_ONE, rpc1, null);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
// try adding the same (fails)
try {
admin.addPeer(ID_ONE, rpc1, null);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
} catch (Exception e) {
// OK!
}
assertEquals(1, admin.getPeersCount());
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
// Try to remove an inexisting peer
try {
admin.removePeer(ID_SECOND);
hbaseAdmin.removeReplicationPeer(ID_SECOND);
fail();
} catch (Exception iae) {
} catch (Exception e) {
// OK!
}
assertEquals(1, admin.getPeersCount());
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
// Add a second since multi-slave is supported
try {
admin.addPeer(ID_SECOND, rpc2, null);
} catch (Exception iae) {
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
} catch (Exception e) {
fail();
}
assertEquals(2, admin.getPeersCount());
assertEquals(2, hbaseAdmin.listReplicationPeers().size());
// Remove the first peer we added
admin.removePeer(ID_ONE);
assertEquals(1, admin.getPeersCount());
admin.removePeer(ID_SECOND);
assertEquals(0, admin.getPeersCount());
hbaseAdmin.removeReplicationPeer(ID_ONE);
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
hbaseAdmin.removeReplicationPeer(ID_SECOND);
assertEquals(0, hbaseAdmin.listReplicationPeers().size());
}
@Test