HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-04 16:58:01 +08:00 committed by zhangduo
parent 53b18fe0ac
commit d36aacdf9e
20 changed files with 660 additions and 547 deletions

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
private final Configuration conf;
private final String id;

View File

@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ServerName;
/**
* This class is responsible for the parsing logic for a znode representing a queue.
* This class is responsible for the parsing logic for a queue id representing a queue.
* It will extract the peerId if it's recovered as well as the dead region servers
* that were part of the queue's history.
*/
@ -38,21 +38,20 @@ public class ReplicationQueueInfo {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class);
private final String peerId;
private final String peerClusterZnode;
private final String queueId;
private boolean queueRecovered;
// List of all the dead region servers that had this queue (if recovered)
private List<ServerName> deadRegionServers = new ArrayList<>();
/**
* The passed znode will be either the id of the peer cluster or
* the handling story of that queue in the form of id-servername-*
* The passed queueId will be either the id of the peer or the handling story of that queue
* in the form of id-servername-*
*/
public ReplicationQueueInfo(String znode) {
this.peerClusterZnode = znode;
String[] parts = znode.split("-", 2);
public ReplicationQueueInfo(String queueId) {
this.queueId = queueId;
String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
this.peerId = this.queueRecovered ?
parts[0] : peerClusterZnode;
this.peerId = this.queueRecovered ? parts[0] : queueId;
if (parts.length >= 2) {
// extract dead servers
extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
@ -60,7 +59,7 @@ public class ReplicationQueueInfo {
}
/**
* Parse dead server names from znode string servername can contain "-" such as
* Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
*/
@ -119,8 +118,8 @@ public class ReplicationQueueInfo {
return this.peerId;
}
public String getPeerClusterZnode() {
return this.peerClusterZnode;
public String getQueueId() {
return this.queueId;
}
public boolean isQueueRecovered() {

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -76,4 +80,56 @@ public final class ReplicationUtils {
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
if (c1 == null) {
return c2 == null;
}
if (c2 == null) {
return false;
}
return c1.size() == c2.size() && c1.containsAll(c2);
}
private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
return isCollectionEqual(ns1, ns2);
}
private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
Map<TableName, List<String>> tableCFs2) {
if (tableCFs1 == null) {
return tableCFs2 == null;
}
if (tableCFs2 == null) {
return false;
}
if (tableCFs1.size() != tableCFs2.size()) {
return false;
}
for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
TableName table = entry1.getKey();
if (!tableCFs2.containsKey(table)) {
return false;
}
List<String> cfs1 = entry1.getValue();
List<String> cfs2 = tableCFs2.get(table);
if (!isCollectionEqual(cfs1, cfs2)) {
return false;
}
}
return true;
}
public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) {
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
return false;
}
if (rpc1.replicateAllUserTables()) {
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
} else {
return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@ -38,8 +37,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
@ -48,8 +45,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
private static Configuration conf;
private static HBaseZKTestingUtility utility;
private static ZKWatcher zkw;
@ -97,20 +92,4 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why);
if (LOG.isDebugEnabled()) {
LOG.debug(e.toString(), e);
}
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information

View File

@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A handler for modifying replication peer in peer procedures.
*/
@InterfaceAudience.Private
public interface PeerProcedureHandler {

View File

@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private final ReplicationSourceManager replicationSourceManager;
private final KeyLocker<String> peersLock = new KeyLocker<>();
@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
public void addPeer(String peerId) throws ReplicationException, IOException {
public void addPeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
replicationSourceManager.addPeer(peerId);
@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
public void removePeer(String peerId) throws ReplicationException, IOException {
public void removePeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
try {
ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
PeerState oldState = peer.getPeerState();
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
// RS need to start work with the new replication state change
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
}
} finally {
peerLock.unlock();
}
LOG.info("disable replication peer, id: {}, new state: {}", peerId, newState);
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
try {
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
} finally {
peerLock.unlock();
}
LOG.info("enable replication peer, id: {}, new state: {}", peerId, newState);
refreshPeerState(peerId);
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
refreshPeerState(peerId);
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
ReplicationPeerConfig oldConfig = peer.getPeerConfig();
ReplicationPeerConfig newConfig =
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
// RS need to start work with the new replication config change
if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
replicationSourceManager.refreshSources(peerId);
}
} finally {
peerLock.unlock();
}

View File

@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
return walReader;
}
@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
}
if (allTasksDone) {
manager.closeRecoveredQueue(this);
LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
manager.removeRecoveredSource(this);
LOG.info("Finished recovering queue " + queueId + " with the following stats: "
+ getStats());
}
}

View File

@ -76,7 +76,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
shipEdits(entryBatch);
if (entryBatch.getWalEntries().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ source.getPeerClusterZnode());
+ source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
@ -113,7 +113,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
String peerClusterZnode = source.getPeerClusterZnode();
String peerClusterZnode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
peerClusterZnode, this.queue.peek().getName());
@ -129,8 +129,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
@Override
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
lastReadPosition, true, false);
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, true);
lastLoggedPosition = lastReadPosition;
}

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -187,11 +186,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
*/
@Override
public void startReplicationService() throws IOException {
try {
this.replicationManager.init();
} catch (ReplicationException e) {
throw new IOException(e);
}
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
@ -210,9 +205,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
} catch (ReplicationException e) {
} catch (IOException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
throw new IOException(e);
throw e;
}
}

View File

@ -105,7 +105,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// The znode we currently play with
protected String peerClusterZnode;
protected String queueId;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@ -141,14 +141,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param fs file system to use
* @param manager replication manager to ping to
* @param server the server for this region server
* @param peerClusterZnode the name of our znode
* @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param metrics metrics for replication source
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
@ -167,8 +167,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.metrics = metrics;
this.clusterId = clusterId;
this.peerClusterZnode = peerClusterZnode;
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@ -178,7 +178,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@ -216,12 +216,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
@ -310,7 +304,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@ -355,7 +349,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
ReplicationSourceWALReader walReader =
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
}
@ -449,7 +443,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode,
Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
handler);
}
@ -465,9 +459,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
LOG.info("Closing source " + this.queueId + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason,
LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason,
cause);
}
this.sourceRunning = false;
@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.peerClusterZnode,
+ this.queueId,
te);
}
}
@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
public String getQueueId() {
return this.queueId;
}
@Override

View File

@ -32,8 +32,8 @@ public class ReplicationSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
static ReplicationSourceInterface create(Configuration conf, String peerId) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
static ReplicationSourceInterface create(Configuration conf, String queueId) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
try {

View File

@ -51,7 +51,7 @@ public interface ReplicationSourceInterface {
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException;
/**
@ -96,11 +96,11 @@ public interface ReplicationSourceInterface {
Path getCurrentPath();
/**
* Get the id that the source is replicating to
* Get the queue id that the source is replicating to
*
* @return peer cluster id
* @return queue id
*/
String getPeerClusterZnode();
String getQueueId();
/**
* Get the id that the source is replicating to.

View File

@ -223,15 +223,15 @@ public class ReplicationSourceShipper extends Thread {
}
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
lastReadPosition, false, false);
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, false);
lastLoggedPosition = lastReadPosition;
}
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+ source.getPeerClusterZnode(), handler);
+ source.getQueueId(), handler);
}
public PriorityBlockingQueue<Path> getLogQueue() {

View File

@ -111,7 +111,7 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity

View File

@ -89,7 +89,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
public String getPeerClusterZnode() {
public String getQueueId() {
return peerClusterId;
}

View File

@ -28,12 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@ -41,6 +40,8 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
@ -71,9 +72,6 @@ public class TestNamespaceReplication extends TestReplicationBase {
private static final byte[] val = Bytes.toBytes("myval");
private static HTableDescriptor tabA;
private static HTableDescriptor tabB;
private static Connection connection1;
private static Connection connection2;
private static Admin admin1;
@ -93,23 +91,21 @@ public class TestNamespaceReplication extends TestReplicationBase {
admin2.createNamespace(NamespaceDescriptor.create(ns1).build());
admin2.createNamespace(NamespaceDescriptor.create(ns2).build());
tabA = new HTableDescriptor(tabAName);
HColumnDescriptor fam = new HColumnDescriptor(f1Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabA.addFamily(fam);
fam = new HColumnDescriptor(f2Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabA.addFamily(fam);
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tabAName);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(f1Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
builder.addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(f2Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
TableDescriptor tabA = builder.build();
admin1.createTable(tabA);
admin2.createTable(tabA);
tabB = new HTableDescriptor(tabBName);
fam = new HColumnDescriptor(f1Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabB.addFamily(fam);
fam = new HColumnDescriptor(f2Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabB.addFamily(fam);
builder = TableDescriptorBuilder.newBuilder(tabBName);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(f1Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
builder.addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(f2Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
TableDescriptor tabB = builder.build();
admin1.createTable(tabB);
admin2.createTable(tabB);
}
@ -137,22 +133,24 @@ public class TestNamespaceReplication extends TestReplicationBase {
@Test
public void testNamespaceReplication() throws Exception {
String peerId = "2";
Table htab1A = connection1.getTable(tabAName);
Table htab2A = connection2.getTable(tabAName);
Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName);
ReplicationPeerConfig rpc = admin.getPeerConfig("2");
rpc.setReplicateAllUserTables(false);
admin.updatePeerConfig("2", rpc);
ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
admin1.updateReplicationPeerConfig(peerId,
ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(false).build());
// add ns1 to peer config which replicate to cluster2
rpc = admin.getPeerConfig("2");
rpc = admin1.getReplicationPeerConfig(peerId);
Set<String> namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig("2", rpc);
admin1.updateReplicationPeerConfig(peerId,
ReplicationPeerConfig.newBuilder(rpc).setNamespaces(namespaces).build());
LOG.info("update peer config");
// Table A can be replicated to cluster2
@ -166,15 +164,14 @@ public class TestNamespaceReplication extends TestReplicationBase {
ensureRowNotExisted(htab2B, row, f1Name, f2Name);
// add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2
rpc = admin.getPeerConfig("2");
rpc = admin1.getReplicationPeerConfig(peerId);
namespaces = new HashSet<>();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tabAName, new ArrayList<>());
tableCfs.get(tabAName).add("f1");
rpc.setTableCFsMap(tableCfs);
admin.updatePeerConfig("2", rpc);
admin1.updateReplicationPeerConfig(peerId, ReplicationPeerConfig.newBuilder(rpc)
.setNamespaces(namespaces).setTableCFsMap(tableCfs).build());
LOG.info("update peer config");
// Only family f1 of Table A can replicated to cluster2
@ -189,7 +186,7 @@ public class TestNamespaceReplication extends TestReplicationBase {
delete(htab1B, row, f1Name, f2Name);
ensureRowNotExisted(htab2B, row, f1Name, f2Name);
admin.removePeer("2");
admin1.removeReplicationPeer(peerId);
}
private void put(Table source, byte[] row, byte[]... families)

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -322,7 +321,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
"1", 0, false);
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),