diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index ae4e7ccb45b..5b9cef72db8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -24,15 +24,16 @@ import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService; + /** - * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this - * class rather than implementing {@link ReplicationEndpoint} directly for better backwards - * compatibility. + * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal + * Guava. */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +// This class has been made InterfaceAudience.Private in 2.0.0. It used to be +// LimitedPrivate. See HBASE-15982. +@InterfaceAudience.Private public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { @@ -109,4 +110,9 @@ public abstract class BaseReplicationEndpoint extends AbstractService public boolean canReplicateToSameCluster() { return false; } + + @Override + public boolean isStarting() { + return state() == State.STARTING; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 1bc18a9dfe8..42667e16a34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -78,6 +78,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } } + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + @Override protected void doStart() { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 6bf696b2c06..f23276cc687 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -31,8 +33,6 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; - /** * ReplicationEndpoint is a plugin which implements replication * to other HBase clusters, or other systems. ReplicationEndpoint implementation @@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; * and persisting of the WAL entries in the other cluster. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { +public interface ReplicationEndpoint extends ReplicationPeerConfigListener { + // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. + // How they relate? Do we #start before #init(Context)? We fail fast if you don't? @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { @@ -176,4 +178,82 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe * parameters can be obtained. */ boolean replicate(ReplicateContext replicateContext); -} + + + // The below methods are inspired by Guava Service. See + // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. + // Below we implement a subset only with different names on some methods so we can implement + // the below internally using Guava (without exposing our implementation to + // ReplicationEndpoint implementors. + + /** + * Returns {@code true} if this service is RUNNING. + */ + boolean isRunning(); + + /** + * @return Return {@code true} is this service is STARTING (but not yet RUNNING). + */ + boolean isStarting(); + + /** + * Initiates service startup and returns immediately. A stopped service may not be restarted. + * Equivalent of startAsync call in Guava Service. + * @throws IllegalStateException if the service is not new, if it has been run already. + */ + void start(); + + /** + * Waits for the {@link ReplicationEndpoint} to be up and running. + * + * @throws IllegalStateException if the service reaches a state from which it is not possible to + * enter the (internal) running state. e.g. if the state is terminated when this method is + * called then this will throw an IllegalStateException. + */ + void awaitRunning(); + + /** + * Waits for the {@link ReplicationEndpoint} to to be up and running for no more + * than the given time. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws TimeoutException if the service has not reached the given state within the deadline + * @throws IllegalStateException if the service reaches a state from which it is not possible to + * enter the (internal) running state. e.g. if the state is terminated when this method is + * called then this will throw an IllegalStateException. + */ + void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; + + /** + * If the service is starting or running, this initiates service shutdown and returns immediately. + * If the service has already been stopped, this method returns immediately without taking action. + * Equivalent of stopAsync call in Guava Service. + */ + void stop(); + + /** + * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. + * + * @throws IllegalStateException if the service FAILED. + */ + void awaitTerminated(); + + /** + * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no + * more than the given time. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws TimeoutException if the service has not reached the given state within the deadline + * @throws IllegalStateException if the service FAILED. + */ + void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; + + /** + * Returns the {@link Throwable} that caused this service to fail. + * + * @throws IllegalStateException if this service's state isn't FAILED. + */ + Throwable failureCause(); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1d3e4fb3c8d..f3a37ddc5d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; import java.io.IOException; import java.util.ArrayList; @@ -130,6 +128,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private AtomicLong totalBufferUsed; + public static final String WAIT_ON_ENDPOINT_SECONDS = + "hbase.replication.wait.on.endpoint.seconds"; + public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; + private int waitOnEndpointSeconds = -1; + /** * Instantiation method used by region servers * @@ -152,6 +155,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); + this.waitOnEndpointSeconds = + this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second @@ -245,17 +250,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.sourceRunning = true; try { // start the endpoint, connect to the cluster - Service service = replicationEndpoint.startAsync(); - final int waitTime = 10; - service.awaitRunning(waitTime, TimeUnit.SECONDS); - if (!service.isRunning()) { - LOG.warn("ReplicationEndpoint was not started after waiting " + waitTime + - " + seconds. Exiting"); - uninitialize(); - return; - } + this.replicationEndpoint.start(); + this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS); } catch (Exception ex) { LOG.warn("Error starting ReplicationEndpoint, exiting", ex); + uninitialize(); throw new RuntimeException(ex); } @@ -383,14 +382,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private void uninitialize() { LOG.debug("Source exiting " + this.peerId); metrics.clear(); - if (replicationEndpoint.state() == Service.State.STARTING - || replicationEndpoint.state() == Service.State.RUNNING) { - replicationEndpoint.stopAsync(); - final int waitTime = 10; + if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) { + this.replicationEndpoint.stop(); try { - replicationEndpoint.awaitTerminated(waitTime, TimeUnit.SECONDS); + this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS); } catch (TimeoutException e) { - LOG.warn("Failed termination after " + waitTime + " seconds."); + LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds."); } } } @@ -463,18 +460,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf worker.entryReader.interrupt(); worker.interrupt(); } - Service service = null; if (this.replicationEndpoint != null) { - service = this.replicationEndpoint.stopAsync(); + this.replicationEndpoint.stop(); } if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); } - if (service != null) { + if (this.replicationEndpoint != null) { try { - service.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); + this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + this.peerClusterZnode, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 51655a1d744..1ce2b3d1157 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; public class VisibilityReplicationEndpoint implements ReplicationEndpoint { private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class); - private ReplicationEndpoint delegator; - private VisibilityLabelService visibilityLabelsService; + + private final ReplicationEndpoint delegator; + private final VisibilityLabelService visibilityLabelsService; public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, VisibilityLabelService visibilityLabelsService) { @@ -62,7 +63,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { @Override public void peerConfigUpdated(ReplicationPeerConfig rpc){ - + delegator.peerConfigUpdated(rpc); } @Override @@ -137,24 +138,17 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { return delegator.getWALEntryfilter(); } - @Override - public Service startAsync() { - return this.delegator.startAsync(); - } - @Override public boolean isRunning() { - return delegator.isRunning(); + return this.delegator.isRunning(); } @Override - public State state() { - return delegator.state(); - } + public boolean isStarting() {return this.delegator.isStarting();} @Override - public Service stopAsync() { - return this.delegator.stopAsync(); + public void start() { + this.delegator.start(); } @Override @@ -163,8 +157,13 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public void awaitRunning(long l, TimeUnit timeUnit) throws TimeoutException { - this.delegator.awaitRunning(l, timeUnit); + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + this.delegator.awaitRunning(timeout, unit); + } + + @Override + public void stop() { + this.delegator.stop(); } @Override @@ -173,17 +172,12 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public void awaitTerminated(long l, TimeUnit timeUnit) throws TimeoutException { - this.delegator.awaitTerminated(l, timeUnit); + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + this.delegator.awaitTerminated(timeout, unit); } @Override public Throwable failureCause() { return this.delegator.failureCause(); } - - @Override - public void addListener(Listener listener, Executor executor) { - this.delegator.addListener(listener, executor); - } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 3b5522bc47b..26103138f0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -249,6 +249,16 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { notifyAll(); } + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + @Override protected void doStart() { notifyStarted(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index c63a69b023d..a0562bfa7a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -395,6 +395,16 @@ public class TestReplicationEndpoint extends TestReplicationBase { return true; } + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + @Override protected void doStart() { startedCount.incrementAndGet(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 7ea698c5b8e..c3b7eafdd2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -158,7 +158,7 @@ public class TestReplicationSource { // completes } }; - replicationEndpoint.startAsync(); + replicationEndpoint.start(); ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);