HBASE-19623 Create replication endpoint asynchronously when adding a replication source

This commit is contained in:
zhangduo 2018-01-02 13:25:58 +08:00
parent 525fef572e
commit 5d922a5e0c
15 changed files with 116 additions and 170 deletions

View File

@ -53,6 +53,14 @@ public interface ReplicationPeer {
*/ */
PeerState getPeerState(); PeerState getPeerState();
/**
* Test whether the peer is enabled.
* @return {@code true} if enabled, otherwise {@code false}.
*/
default boolean isPeerEnabled() {
return getPeerState() == PeerState.ENABLED;
}
/** /**
* Get the peer config object * Get the peer config object
* @return the ReplicationPeerConfig for this peer * @return the ReplicationPeerConfig for this peer

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -85,21 +86,6 @@ public class ReplicationPeers {
peerCache.remove(peerId); peerCache.remove(peerId);
} }
/**
* Get the peer state for the specified connected remote slave cluster. The value might be read
* from cache, so it is recommended to use {@link #peerStorage } to read storage directly if
* reading the state after enabling or disabling it.
* @param peerId a short that identifies the cluster
* @return true if replication is enabled, false otherwise.
*/
public boolean isPeerEnabled(String peerId) {
ReplicationPeer replicationPeer = this.peerCache.get(peerId);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
}
return replicationPeer.getPeerState() == PeerState.ENABLED;
}
/** /**
* Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no peer * continue to track changes to the Peer's state and config. This method returns null if no peer
@ -117,7 +103,7 @@ public class ReplicationPeers {
* @return a Set of Strings for peerIds * @return a Set of Strings for peerIds
*/ */
public Set<String> getAllPeerIds() { public Set<String> getAllPeerIds() {
return peerCache.keySet(); return Collections.unmodifiableSet(peerCache.keySet());
} }
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -30,8 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@ -41,8 +38,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@InterfaceAudience.Private @InterfaceAudience.Private
class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class);
public static final byte[] ENABLED_ZNODE_BYTES = public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES = public static final byte[] DISABLED_ZNODE_BYTES =
@ -126,7 +121,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
@Override @Override
public List<String> listPeerIds() throws ReplicationException { public List<String> listPeerIds() throws ReplicationException {
try { try {
return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode)); return CollectionUtils.nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, peersZNode));
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ReplicationException("Cannot get the list of peers", e); throw new ReplicationException("Cannot get the list of peers", e);
} }

View File

@ -225,11 +225,6 @@ public abstract class TestReplicationStateBasic {
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (ReplicationException e) { } catch (ReplicationException e) {
} }
try {
rp.isPeerEnabled("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try { try {
assertFalse(rp.addPeer("bogus")); assertFalse(rp.addPeer("bogus"));
@ -245,12 +240,6 @@ public abstract class TestReplicationStateBasic {
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
assertNumberOfPeers(2); assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
try {
rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
rp.getPeerStorage().removePeer(ID_ONE); rp.getPeerStorage().removePeer(ID_ONE);
@ -261,7 +250,7 @@ public abstract class TestReplicationStateBasic {
rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
rp.addPeer(ID_ONE); rp.addPeer(ID_ONE);
assertNumberOfPeers(2); assertNumberOfPeers(2);
assertTrue(rp.isPeerEnabled(ID_ONE)); assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
rp.getPeerStorage().setPeerState(ID_ONE, false); rp.getPeerStorage().setPeerState(ID_ONE, false);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it // now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually... // manually...
@ -279,11 +268,6 @@ public abstract class TestReplicationStateBasic {
// Disconnect peer // Disconnect peer
rp.removePeer(ID_ONE); rp.removePeer(ID_ONE);
assertNumberOfPeers(2); assertNumberOfPeers(2);
try {
rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
} }
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
@ -292,7 +276,7 @@ public abstract class TestReplicationStateBasic {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
} }
while (true) { while (true) {
if (status == rp.isPeerEnabled(peerId)) { if (status == rp.getPeer(peerId).isPeerEnabled()) {
return; return;
} }
if (zkTimeoutCount < ZK_MAX_COUNT) { if (zkTimeoutCount < ZK_MAX_COUNT) {

View File

@ -80,15 +80,11 @@ public class TestZKReplicationPeerStorage {
private ReplicationPeerConfig getConfig(int seed) { private ReplicationPeerConfig getConfig(int seed) {
Random rand = new Random(seed); Random rand = new Random(seed);
ReplicationPeerConfig config = new ReplicationPeerConfig(); return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
config.setClusterKey(Long.toHexString(rand.nextLong())); .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong())); .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
config.setNamespaces(randNamespaces(rand)); .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
config.setExcludeNamespaces(randNamespaces(rand)); .setBandwidth(rand.nextInt(1000)).build();
config.setTableCFsMap(randTableCFs(rand));
config.setReplicateAllUserTables(rand.nextBoolean());
config.setBandwidth(rand.nextInt(1000));
return config;
} }
private void assertSetEquals(Set<String> expected, Set<String> actual) { private void assertSetEquals(Set<String> expected, Set<String> actual) {

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -39,7 +37,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -48,22 +45,24 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
/** /**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
@ -416,7 +415,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} }
protected boolean isPeerEnabled() { protected boolean isPeerEnabled() {
return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED; return ctx.getReplicationPeer().isPeerEnabled();
} }
@Override @Override

View File

@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -51,11 +50,11 @@ public class RecoveredReplicationSource extends ReplicationSource {
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode, super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
clusterId, replicationEndpoint, walFileLengthProvider, metrics); clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId(); this.actualPeerId = this.replicationQueueInfo.getPeerId();
} }

View File

@ -38,14 +38,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
@ -82,7 +84,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// per group queue size, keep no more than this number of logs in each wal group // per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup; protected int queueSizePerGroup;
protected ReplicationQueueStorage queueStorage; protected ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers; private ReplicationPeer replicationPeer;
protected Configuration conf; protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo; protected ReplicationQueueInfo replicationQueueInfo;
@ -112,6 +114,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private MetricsSource metrics; private MetricsSource metrics;
// WARN threshold for the number of queued logs, defaults to 2 // WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold; private int logQueueWarnThreshold;
// whether the replication endpoint has been initialized
private volatile boolean endpointInitialized = false;
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint; private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries. // A filter (or a chain of filters) for the WAL entries.
@ -133,22 +137,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
*
* @param conf configuration to use * @param conf configuration to use
* @param fs file system to use * @param fs file system to use
* @param manager replication manager to ping to * @param manager replication manager to ping to
* @param server the server for this region server * @param server the server for this region server
* @param peerClusterZnode the name of our znode * @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster * @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source * @param metrics metrics for replication source
* @throws IOException
*/ */
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { MetricsSource metrics) throws IOException {
this.server = server; this.server = server;
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds = this.waitOnEndpointSeconds =
@ -160,7 +161,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.queueStorage = queueStorage; this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers; this.replicationPeer = replicationPeer;
this.manager = manager; this.manager = manager;
this.fs = fs; this.fs = fs;
this.metrics = metrics; this.metrics = metrics;
@ -171,7 +172,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// ReplicationQueueInfo parses the peerId out of the znode for us // ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId(); this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth(); currentBandwidth = getCurrentBandwidth();
@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (queue == null) { if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue); queues.put(logPrefix, queue);
if (this.sourceRunning) { if (this.isSourceActive() && this.endpointInitialized) {
// new wal group observed after source startup, start a new worker thread to track it // new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread // notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker // still not launched, so it's necessary to check workerThreads before start the worker
@ -222,7 +222,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// A peerId will not have "-" in its name, see HBASE-11394 // A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0]; peerId = peerClusterZnode.split("-")[0];
} }
Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs(); Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName); List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName) if (tableCFMap.containsKey(tableName)
@ -241,21 +241,59 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
} }
private void initAndStartReplicationEndpoint() throws Exception {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
replicationEndpoint =
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint =
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
@Override @Override
public void run() { public void run() {
// mark we are running now // mark we are running now
this.sourceRunning = true; this.sourceRunning = true;
try {
// start the endpoint, connect to the cluster
this.replicationEndpoint.start();
this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
} catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
uninitialize();
throw new RuntimeException(ex);
}
int sleepMultiplier = 1; int sleepMultiplier = 1;
while (this.isSourceActive()) {
try {
initAndStartReplicationEndpoint();
break;
} catch (Exception e) {
LOG.warn("Error starting ReplicationEndpoint, retrying", e);
if (replicationEndpoint != null) {
replicationEndpoint.stop();
replicationEndpoint = null;
}
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
this.endpointInitialized = true;
sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) { while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID(); this.peerClusterId = replicationEndpoint.getPeerUUID();
@ -288,8 +326,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private void initializeWALEntryFilter() { private void initializeWALEntryFilter() {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList( ArrayList<WALEntryFilter> filters =
(WALEntryFilter)new SystemTableWALEntryFilter()); Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) { if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint); filters.add(filterFromEndpoint);
@ -309,7 +347,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
worker.startup(getUncaughtExceptionHandler()); worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
worker.getStartPosition())); worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
} }
} }
@ -370,25 +407,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
private long getCurrentBandwidth() { private long getCurrentBandwidth() {
ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId); long peerBandwidth = replicationPeer.getPeerBandwidth();
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth // user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
} }
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
this.replicationEndpoint.stop();
try {
this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
}
}
}
/** /**
* Do the sleeping logic * Do the sleeping logic
* @param msg Why we sleep * @param msg Why we sleep
@ -410,12 +433,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
/** /**
* check whether the peer is enabled or not * check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false * @return true if the peer is enabled, otherwise false
*/ */
@Override @Override
public boolean isPeerEnabled() { public boolean isPeerEnabled() {
return this.replicationPeers.isPeerEnabled(this.peerId); return replicationPeer.isPeerEnabled();
} }
@Override @Override
@ -427,8 +449,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
LOG.error("Unexpected exception in ReplicationSource", e); LOG.error("Unexpected exception in ReplicationSource", e);
} }
}; };
Threads Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode,
.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler); handler);
} }
@Override @Override

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -50,9 +50,9 @@ public interface ReplicationSourceInterface {
* @param server the server for this region server * @param server the server for this region server
*/ */
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; MetricsSource metrics) throws IOException;
/** /**
* Add a log to the list of logs to replicate * Add a log to the list of logs to replicate

View File

@ -47,11 +47,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
@ -491,49 +487,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId the id of the peer cluster * @param peerId the id of the peer cluster
* @return the created source * @return the created source
*/ */
private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer) private ReplicationSourceInterface getReplicationSource(String peerId,
throws IOException { ReplicationPeer replicationPeer) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
ReplicationEndpoint replicationEndpoint = null;
try {
String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
replicationEndpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint =
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
} catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors" +
" while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e);
}
MetricsSource metrics = new MetricsSource(peerId); MetricsSource metrics = new MetricsSource(peerId);
// init replication source // init replication source
src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId, src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId,
replicationEndpoint, walFileLengthProvider, metrics); walFileLengthProvider, metrics);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs,
peerId, clusterId, peer, metrics, tableDescriptors, server));
return src; return src;
} }

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After; import org.junit.After;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -75,7 +74,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
} }

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -90,7 +89,6 @@ public class TestReplicationAdmin {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
hbaseAdmin = TEST_UTIL.getAdmin(); hbaseAdmin = TEST_UTIL.getAdmin();

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -48,9 +49,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throws IOException {
this.manager = manager; this.manager = manager;
this.peerClusterId = peerClusterId; this.peerClusterId = peerClusterId;
this.metrics = metrics; this.metrics = metrics;

View File

@ -162,15 +162,14 @@ public class TestReplicationSource {
} }
}; };
replicationEndpoint.start(); replicationEndpoint.start();
ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create(); Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1); testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
replicationEndpoint, p -> OptionalLong.empty(), null); p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() { Future<?> future = executor.submit(new Runnable() {

View File

@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -736,9 +736,9 @@ public abstract class TestReplicationSourceManager {
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throws IOException {
throw new IOException("Failing deliberately"); throw new IOException("Failing deliberately");
} }
} }