HBASE-19621 Revisit the methods in ReplicationPeerConfigBuilder
This commit is contained in:
parent
2ce5dc8927
commit
1556939236
|
@ -26,7 +26,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -277,17 +276,13 @@ public final class ReplicationPeerConfigUtil {
|
|||
builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
|
||||
}
|
||||
|
||||
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
|
||||
peerData.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
|
||||
builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
|
||||
}
|
||||
builder.setPeerData(peerData);
|
||||
|
||||
Map<String, String> configuration = new HashMap<>();
|
||||
for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
|
||||
configuration.put(pair.getName(), pair.getValue());
|
||||
builder.putConfiguration(pair.getName(), pair.getValue());
|
||||
}
|
||||
builder.setConfiguration(configuration);
|
||||
|
||||
Map<TableName, List<String>> tableCFsMap = convert2Map(
|
||||
peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
|
||||
|
|
|
@ -219,7 +219,7 @@ public class ReplicationPeerConfig {
|
|||
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
|
||||
builder.setClusterKey(peerConfig.getClusterKey())
|
||||
.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
|
||||
.setPeerData(peerConfig.getPeerData()).setConfiguration(peerConfig.getConfiguration())
|
||||
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
|
||||
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
|
||||
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
|
||||
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
|
||||
|
@ -264,14 +264,14 @@ public class ReplicationPeerConfig {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData) {
|
||||
this.peerData = peerData;
|
||||
public ReplicationPeerConfigBuilder putConfiguration(String key, String value) {
|
||||
this.configuration.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration) {
|
||||
this.configuration = configuration;
|
||||
public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
|
||||
this.peerData.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,9 +43,19 @@ public interface ReplicationPeerConfigBuilder {
|
|||
*/
|
||||
ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl);
|
||||
|
||||
ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData);
|
||||
ReplicationPeerConfigBuilder putConfiguration(String key, String value);
|
||||
|
||||
ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration);
|
||||
default ReplicationPeerConfigBuilder putAllConfiguration(Map<String, String> configuration) {
|
||||
configuration.forEach(this::putConfiguration);
|
||||
return this;
|
||||
}
|
||||
|
||||
ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value);
|
||||
|
||||
default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData) {
|
||||
peerData.forEach(this::putPeerData);
|
||||
return this;
|
||||
}
|
||||
|
||||
ReplicationPeerConfigBuilder
|
||||
setTableCFsMap(Map<TableName, List<String>> tableCFsMap);
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
|||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -363,17 +361,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
+ existingConfig.getReplicationEndpointImpl()
|
||||
+ "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
|
||||
}
|
||||
//Update existingConfig's peer config and peer data with the new values, but don't touch config
|
||||
// Update existingConfig's peer config and peer data with the new values, but don't touch config
|
||||
// or data that weren't explicitly changed
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(newConfig);
|
||||
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
existingConfig.getPeerData().forEach(peerData::put);
|
||||
newConfig.getPeerData().forEach(peerData::put);
|
||||
builder.setPeerData(peerData);
|
||||
Map<String, String> configuration = new HashMap<>();
|
||||
existingConfig.getConfiguration().forEach(configuration::put);
|
||||
newConfig.getConfiguration().forEach(configuration::put);
|
||||
builder.setConfiguration(configuration);
|
||||
builder.putAllConfiguration(existingConfig.getConfiguration());
|
||||
builder.putAllPeerData(existingConfig.getPeerData());
|
||||
|
||||
try {
|
||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
|
@ -121,39 +122,39 @@ public class TestReplicationAdmin {
|
|||
*/
|
||||
@Test
|
||||
public void testAddRemovePeer() throws Exception {
|
||||
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
|
||||
ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
|
||||
rpc1.setClusterKey(KEY_ONE);
|
||||
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||
ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
|
||||
rpc2.setClusterKey(KEY_SECOND);
|
||||
// Add a valid peer
|
||||
admin.addPeer(ID_ONE, rpc1, null);
|
||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
|
||||
// try adding the same (fails)
|
||||
try {
|
||||
admin.addPeer(ID_ONE, rpc1, null);
|
||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
|
||||
} catch (Exception e) {
|
||||
// OK!
|
||||
}
|
||||
assertEquals(1, admin.getPeersCount());
|
||||
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
|
||||
// Try to remove an inexisting peer
|
||||
try {
|
||||
admin.removePeer(ID_SECOND);
|
||||
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||
fail();
|
||||
} catch (Exception iae) {
|
||||
} catch (Exception e) {
|
||||
// OK!
|
||||
}
|
||||
assertEquals(1, admin.getPeersCount());
|
||||
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
|
||||
// Add a second since multi-slave is supported
|
||||
try {
|
||||
admin.addPeer(ID_SECOND, rpc2, null);
|
||||
} catch (Exception iae) {
|
||||
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
|
||||
} catch (Exception e) {
|
||||
fail();
|
||||
}
|
||||
assertEquals(2, admin.getPeersCount());
|
||||
assertEquals(2, hbaseAdmin.listReplicationPeers().size());
|
||||
// Remove the first peer we added
|
||||
admin.removePeer(ID_ONE);
|
||||
assertEquals(1, admin.getPeersCount());
|
||||
admin.removePeer(ID_SECOND);
|
||||
assertEquals(0, admin.getPeersCount());
|
||||
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||
assertEquals(1, hbaseAdmin.listReplicationPeers().size());
|
||||
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||
assertEquals(0, hbaseAdmin.listReplicationPeers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue