diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 48460186b31..2da3cce9402 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -53,6 +53,14 @@ public interface ReplicationPeer { */ PeerState getPeerState(); + /** + * Test whether the peer is enabled. + * @return {@code true} if enabled, otherwise {@code false}. + */ + default boolean isPeerEnabled() { + return getPeerState() == PeerState.ENABLED; + } + /** * Get the peer config object * @return the ReplicationPeerConfig for this peer diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 422801b4939..45940a5454f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -85,21 +86,6 @@ public class ReplicationPeers { peerCache.remove(peerId); } - /** - * Get the peer state for the specified connected remote slave cluster. The value might be read - * from cache, so it is recommended to use {@link #peerStorage } to read storage directly if - * reading the state after enabling or disabling it. - * @param peerId a short that identifies the cluster - * @return true if replication is enabled, false otherwise. - */ - public boolean isPeerEnabled(String peerId) { - ReplicationPeer replicationPeer = this.peerCache.get(peerId); - if (replicationPeer == null) { - throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached"); - } - return replicationPeer.getPeerState() == PeerState.ENABLED; - } - /** * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will * continue to track changes to the Peer's state and config. This method returns null if no peer @@ -117,7 +103,7 @@ public class ReplicationPeers { * @return a Set of Strings for peerIds */ public Set getAllPeerIds() { - return peerCache.keySet(); + return Collections.unmodifiableSet(peerCache.keySet()); } public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index bf448e87f23..42d4b3f5e2c 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import java.util.Arrays; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -30,8 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @@ -41,8 +38,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @InterfaceAudience.Private class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { - private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class); - public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = @@ -126,7 +121,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli @Override public List listPeerIds() throws ReplicationException { try { - return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode)); + return CollectionUtils.nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, peersZNode)); } catch (KeeperException e) { throw new ReplicationException("Cannot get the list of peers", e); } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 07c6c154c94..f3eecccf981 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -225,11 +225,6 @@ public abstract class TestReplicationStateBasic { fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); } catch (ReplicationException e) { } - try { - rp.isPeerEnabled("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { - } try { assertFalse(rp.addPeer("bogus")); @@ -245,12 +240,6 @@ public abstract class TestReplicationStateBasic { rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); assertNumberOfPeers(2); - // Test methods with a peer that is added but not connected - try { - rp.isPeerEnabled(ID_ONE); - fail("There are no connected peers, should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - } assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); rp.getPeerStorage().removePeer(ID_ONE); @@ -261,7 +250,7 @@ public abstract class TestReplicationStateBasic { rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); rp.addPeer(ID_ONE); assertNumberOfPeers(2); - assertTrue(rp.isPeerEnabled(ID_ONE)); + assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); rp.getPeerStorage().setPeerState(ID_ONE, false); // now we do not rely on zk watcher to trigger the state change so we need to trigger it // manually... @@ -279,11 +268,6 @@ public abstract class TestReplicationStateBasic { // Disconnect peer rp.removePeer(ID_ONE); assertNumberOfPeers(2); - try { - rp.isPeerEnabled(ID_ONE); - fail("There are no connected peers, should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - } } protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { @@ -292,7 +276,7 @@ public abstract class TestReplicationStateBasic { fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); } while (true) { - if (status == rp.isPeerEnabled(peerId)) { + if (status == rp.getPeer(peerId).isPeerEnabled()) { return; } if (zkTimeoutCount < ZK_MAX_COUNT) { diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index e8098c8bb43..3eb11da54cf 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -80,15 +80,11 @@ public class TestZKReplicationPeerStorage { private ReplicationPeerConfig getConfig(int seed) { Random rand = new Random(seed); - ReplicationPeerConfig config = new ReplicationPeerConfig(); - config.setClusterKey(Long.toHexString(rand.nextLong())); - config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong())); - config.setNamespaces(randNamespaces(rand)); - config.setExcludeNamespaces(randNamespaces(rand)); - config.setTableCFsMap(randTableCFs(rand)); - config.setReplicateAllUserTables(rand.nextBoolean()); - config.setBandwidth(rand.nextInt(1000)); - return config; + return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) + .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) + .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) + .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) + .setBandwidth(rand.nextInt(1000)).build(); } private void assertSetEquals(Set expected, Set actual) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 5467de01882..fd3c671220b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; @@ -39,7 +37,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -48,22 +45,24 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} @@ -416,7 +415,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } protected boolean isPeerEnabled() { - return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED; + return ctx.getReplicationPeer().isPeerEnabled(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index e0c45d5c56a..7bceb78da04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -51,11 +50,11 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode, - clusterId, replicationEndpoint, walFileLengthProvider, metrics); + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException { + super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, + clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8250992d761..ffed88d1fc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -38,14 +38,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; -import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; @@ -82,7 +84,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; - private ReplicationPeers replicationPeers; + private ReplicationPeer replicationPeer; protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; @@ -110,8 +112,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private volatile boolean sourceRunning = false; // Metrics for this source private MetricsSource metrics; - //WARN threshold for the number of queued logs, defaults to 2 + // WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // whether the replication endpoint has been initialized + private volatile boolean endpointInitialized = false; // ReplicationEndpoint which will handle the actual replication private ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. @@ -133,22 +137,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf /** * Instantiation method used by region servers - * * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster - * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source - * @throws IOException */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -160,7 +161,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueStorage = queueStorage; - this.replicationPeers = replicationPeers; + this.replicationPeer = replicationPeer; this.manager = manager; this.fs = fs; this.metrics = metrics; @@ -171,7 +172,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); - this.replicationEndpoint = replicationEndpoint; defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); @@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queues.put(logPrefix, queue); - if (this.sourceRunning) { + if (this.isSourceActive() && this.endpointInitialized) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker @@ -222,7 +222,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // A peerId will not have "-" in its name, see HBASE-11394 peerId = peerClusterZnode.split("-")[0]; } - Map> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs(); + Map> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) @@ -241,21 +241,59 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } + private void initAndStartReplicationEndpoint() throws Exception { + RegionServerCoprocessorHost rsServerHost = null; + TableDescriptors tableDescriptors = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + tableDescriptors = ((HRegionServer) server).getTableDescriptors(); + } + String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); + if (replicationEndpointImpl == null) { + // Default to HBase inter-cluster replication endpoint + replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); + } + replicationEndpoint = + Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); + if (rsServerHost != null) { + ReplicationEndpoint newReplicationEndPoint = + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + if (newReplicationEndPoint != null) { + // Override the newly created endpoint from the hook with configured end point + replicationEndpoint = newReplicationEndPoint; + } + } + replicationEndpoint + .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, + clusterId, replicationPeer, metrics, tableDescriptors, server)); + replicationEndpoint.start(); + replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); + } + @Override public void run() { // mark we are running now this.sourceRunning = true; - try { - // start the endpoint, connect to the cluster - this.replicationEndpoint.start(); - this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS); - } catch (Exception ex) { - LOG.warn("Error starting ReplicationEndpoint, exiting", ex); - uninitialize(); - throw new RuntimeException(ex); - } int sleepMultiplier = 1; + while (this.isSourceActive()) { + try { + initAndStartReplicationEndpoint(); + break; + } catch (Exception e) { + LOG.warn("Error starting ReplicationEndpoint, retrying", e); + if (replicationEndpoint != null) { + replicationEndpoint.stop(); + replicationEndpoint = null; + } + if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + this.endpointInitialized = true; + + sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isSourceActive() && this.peerClusterId == null) { this.peerClusterId = replicationEndpoint.getPeerUUID(); @@ -288,8 +326,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private void initializeWALEntryFilter() { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters - ArrayList filters = Lists.newArrayList( - (WALEntryFilter)new SystemTableWALEntryFilter()); + ArrayList filters = + Lists. newArrayList(new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); @@ -309,7 +347,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf worker.startup(getUncaughtExceptionHandler()); worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); } } @@ -370,25 +407,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } private long getCurrentBandwidth() { - ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId); - long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; + long peerBandwidth = replicationPeer.getPeerBandwidth(); // user can set peer bandwidth to 0 to use default bandwidth return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } - private void uninitialize() { - LOG.debug("Source exiting " + this.peerId); - metrics.clear(); - if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) { - this.replicationEndpoint.stop(); - try { - this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS); - } catch (TimeoutException e) { - LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds."); - } - } - } - /** * Do the sleeping logic * @param msg Why we sleep @@ -410,12 +433,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf /** * check whether the peer is enabled or not - * * @return true if the peer is enabled, otherwise false */ @Override public boolean isPeerEnabled() { - return this.replicationPeers.isPeerEnabled(this.peerId); + return replicationPeer.isPeerEnabled(); } @Override @@ -427,8 +449,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf LOG.error("Unexpected exception in ReplicationSource", e); } }; - Threads - .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler); + Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, + handler); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 4b9ed7413a8..4f10c731df2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -50,9 +50,9 @@ public interface ReplicationSourceInterface { * @param server the server for this region server */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, - String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d4d837cc667..968b3fb878c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -47,11 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -491,49 +487,14 @@ public class ReplicationSourceManager implements ReplicationListener { * @param peerId the id of the peer cluster * @return the created source */ - private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer) - throws IOException { - RegionServerCoprocessorHost rsServerHost = null; - TableDescriptors tableDescriptors = null; - if (server instanceof HRegionServer) { - rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); - tableDescriptors = ((HRegionServer) server).getTableDescriptors(); - } - + private ReplicationSourceInterface getReplicationSource(String peerId, + ReplicationPeer replicationPeer) throws IOException { ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); - ReplicationEndpoint replicationEndpoint = null; - try { - String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl(); - if (replicationEndpointImpl == null) { - // Default to HBase inter-cluster replication endpoint - replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); - } - replicationEndpoint = Class.forName(replicationEndpointImpl) - .asSubclass(ReplicationEndpoint.class).newInstance(); - if (rsServerHost != null) { - ReplicationEndpoint newReplicationEndPoint = - rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); - if (newReplicationEndPoint != null) { - // Override the newly created endpoint from the hook with configured end point - replicationEndpoint = newReplicationEndPoint; - } - } - } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors" + - " while initializing ReplicationSource for peer: " + peerId, e); - throw new IOException(e); - } - MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId, - replicationEndpoint, walFileLengthProvider, metrics); - - // init replication endpoint - replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs, - peerId, clusterId, peer, metrics, tableDescriptors, server)); - + src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId, + walFileLengthProvider, metrics); return src; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index cc57dfb3e27..b5a50c0b4dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -75,7 +74,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); - TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e38b9bdc27a..772a9d67daf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -90,7 +89,6 @@ public class TestReplicationAdmin { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); hbaseAdmin = TEST_UTIL.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 14c5e56add0..38ec598c879 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,9 +49,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index ed181ddb914..b98ca7f00f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -162,15 +162,14 @@ public class TestReplicationSource { } }; replicationEndpoint.start(); - ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, - replicationEndpoint, p -> OptionalLong.empty(), null); + source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, + p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 77b2fb29ff4..ffa889a35ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -736,9 +736,9 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { throw new IOException("Failing deliberately"); } }