HBASE-15982 Interface ReplicationEndpoint extends Guava's Service

Breaking change to our ReplicationEndpoint and BaseReplicationEndpoint.

ReplicationEndpoint implemented Guava 0.12 Service. An abstract
subclass, BaseReplicationEndpoint, provided default implementations
and facility, among other things, by extending Guava
AbstractService class.

Both of these HBase classes were marked LimitedPrivate for
REPLICATION so these classes were semi-public and made it so
Guava 0.12 was part of our API.

Having Guava in our API was a mistake. It anchors us and the
implementation of the Interface to Guava 0.12. This is untenable
given Guava changes and that the Service Interface in particular
has had extensive revamp and improvement done. We can't hold to
the Guava Interface. It changed. We can't stay on Guava 0.12;
implementors and others on our CLASSPATH won't abide being stuck
on an old Guava.

So this class makes breaking changes. The unhitching of our Interface
from Guava could only be done in a breaking manner. It undoes the
LimitedPrivate on BaseReplicationEndpoint while keeping it for the RE
Interface. It means consumers will have to copy/paste the
AbstractService-based BRE into their own codebase also supplying their
own Guava; HBase no longer 'supplies' this (our Guava usage has
been internalized, relocated).

This patch then adds into RE the basic methods RE needs of the old
Guava Service rather than return a Service to start/stop only to go
back to the RE instance to do actual work. A few method names had to
be changed so could make implementations with Guava Service internally
and not have RE method names and types clash). Semantics remained the
same otherwise. For example startAsync and stopAsync in Guava are start
and stop in RE.
This commit is contained in:
Michael Stack 2017-08-08 21:55:47 +08:00
parent 1ae9a39011
commit ec7bca1769
8 changed files with 160 additions and 54 deletions

View File

@ -24,15 +24,16 @@ import java.util.ArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService;
/** /**
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal
* class rather than implementing {@link ReplicationEndpoint} directly for better backwards * Guava.
* compatibility.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) // This class has been made InterfaceAudience.Private in 2.0.0. It used to be
// LimitedPrivate. See HBASE-15982.
@InterfaceAudience.Private
public abstract class BaseReplicationEndpoint extends AbstractService public abstract class BaseReplicationEndpoint extends AbstractService
implements ReplicationEndpoint { implements ReplicationEndpoint {
@ -109,4 +110,9 @@ public abstract class BaseReplicationEndpoint extends AbstractService
public boolean canReplicateToSameCluster() { public boolean canReplicateToSameCluster() {
return false; return false;
} }
@Override
public boolean isStarting() {
return state() == State.STARTING;
}
} }

View File

@ -78,6 +78,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
} }
} }
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override @Override
protected void doStart() { protected void doStart() {
try { try {

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -31,8 +33,6 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
/** /**
* ReplicationEndpoint is a plugin which implements replication * ReplicationEndpoint is a plugin which implements replication
* to other HBase clusters, or other systems. ReplicationEndpoint implementation * to other HBase clusters, or other systems. ReplicationEndpoint implementation
@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
* and persisting of the WAL entries in the other cluster. * and persisting of the WAL entries in the other cluster.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
// TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
// How they relate? Do we #start before #init(Context)? We fail fast if you don't?
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context { class Context {
@ -176,4 +178,82 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
* parameters can be obtained. * parameters can be obtained.
*/ */
boolean replicate(ReplicateContext replicateContext); boolean replicate(ReplicateContext replicateContext);
// The below methods are inspired by Guava Service. See
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
// Below we implement a subset only with different names on some methods so we can implement
// the below internally using Guava (without exposing our implementation to
// ReplicationEndpoint implementors.
/**
* Returns {@code true} if this service is RUNNING.
*/
boolean isRunning();
/**
* @return Return {@code true} is this service is STARTING (but not yet RUNNING).
*/
boolean isStarting();
/**
* Initiates service startup and returns immediately. A stopped service may not be restarted.
* Equivalent of startAsync call in Guava Service.
* @throws IllegalStateException if the service is not new, if it has been run already.
*/
void start();
/**
* Waits for the {@link ReplicationEndpoint} to be up and running.
*
* @throws IllegalStateException if the service reaches a state from which it is not possible to
* enter the (internal) running state. e.g. if the state is terminated when this method is
* called then this will throw an IllegalStateException.
*/
void awaitRunning();
/**
* Waits for the {@link ReplicationEndpoint} to to be up and running for no more
* than the given time.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws TimeoutException if the service has not reached the given state within the deadline
* @throws IllegalStateException if the service reaches a state from which it is not possible to
* enter the (internal) running state. e.g. if the state is terminated when this method is
* called then this will throw an IllegalStateException.
*/
void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
/**
* If the service is starting or running, this initiates service shutdown and returns immediately.
* If the service has already been stopped, this method returns immediately without taking action.
* Equivalent of stopAsync call in Guava Service.
*/
void stop();
/**
* Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
*
* @throws IllegalStateException if the service FAILED.
*/
void awaitTerminated();
/**
* Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
* more than the given time.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws TimeoutException if the service has not reached the given state within the deadline
* @throws IllegalStateException if the service FAILED.
*/
void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
/**
* Returns the {@link Throwable} that caused this service to fail.
*
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();
} }

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -130,6 +128,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private AtomicLong totalBufferUsed; private AtomicLong totalBufferUsed;
public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
private int waitOnEndpointSeconds = -1;
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
* *
@ -152,6 +155,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
throws IOException { throws IOException {
this.stopper = stopper; this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf(); decorateConf();
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
@ -245,17 +250,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.sourceRunning = true; this.sourceRunning = true;
try { try {
// start the endpoint, connect to the cluster // start the endpoint, connect to the cluster
Service service = replicationEndpoint.startAsync(); this.replicationEndpoint.start();
final int waitTime = 10; this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
service.awaitRunning(waitTime, TimeUnit.SECONDS);
if (!service.isRunning()) {
LOG.warn("ReplicationEndpoint was not started after waiting " + waitTime +
" + seconds. Exiting");
uninitialize();
return;
}
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex); LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
uninitialize();
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
@ -383,14 +382,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private void uninitialize() { private void uninitialize() {
LOG.debug("Source exiting " + this.peerId); LOG.debug("Source exiting " + this.peerId);
metrics.clear(); metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
|| replicationEndpoint.state() == Service.State.RUNNING) { this.replicationEndpoint.stop();
replicationEndpoint.stopAsync();
final int waitTime = 10;
try { try {
replicationEndpoint.awaitTerminated(waitTime, TimeUnit.SECONDS); this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
LOG.warn("Failed termination after " + waitTime + " seconds."); LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
} }
} }
} }
@ -463,18 +460,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
worker.entryReader.interrupt(); worker.entryReader.interrupt();
worker.interrupt(); worker.interrupt();
} }
Service service = null;
if (this.replicationEndpoint != null) { if (this.replicationEndpoint != null) {
service = this.replicationEndpoint.stopAsync(); this.replicationEndpoint.stop();
} }
if (join) { if (join) {
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries); Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
} }
if (service != null) { if (this.replicationEndpoint != null) {
try { try {
service.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) { } catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.peerClusterZnode, + this.peerClusterZnode,

View File

@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
public class VisibilityReplicationEndpoint implements ReplicationEndpoint { public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class); private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class);
private ReplicationEndpoint delegator;
private VisibilityLabelService visibilityLabelsService; private final ReplicationEndpoint delegator;
private final VisibilityLabelService visibilityLabelsService;
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
VisibilityLabelService visibilityLabelsService) { VisibilityLabelService visibilityLabelsService) {
@ -62,7 +63,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
@Override @Override
public void peerConfigUpdated(ReplicationPeerConfig rpc){ public void peerConfigUpdated(ReplicationPeerConfig rpc){
delegator.peerConfigUpdated(rpc);
} }
@Override @Override
@ -137,24 +138,17 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
return delegator.getWALEntryfilter(); return delegator.getWALEntryfilter();
} }
@Override
public Service startAsync() {
return this.delegator.startAsync();
}
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return delegator.isRunning(); return this.delegator.isRunning();
} }
@Override @Override
public State state() { public boolean isStarting() {return this.delegator.isStarting();}
return delegator.state();
}
@Override @Override
public Service stopAsync() { public void start() {
return this.delegator.stopAsync(); this.delegator.start();
} }
@Override @Override
@ -163,8 +157,13 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
} }
@Override @Override
public void awaitRunning(long l, TimeUnit timeUnit) throws TimeoutException { public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
this.delegator.awaitRunning(l, timeUnit); this.delegator.awaitRunning(timeout, unit);
}
@Override
public void stop() {
this.delegator.stop();
} }
@Override @Override
@ -173,17 +172,12 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
} }
@Override @Override
public void awaitTerminated(long l, TimeUnit timeUnit) throws TimeoutException { public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
this.delegator.awaitTerminated(l, timeUnit); this.delegator.awaitTerminated(timeout, unit);
} }
@Override @Override
public Throwable failureCause() { public Throwable failureCause() {
return this.delegator.failureCause(); return this.delegator.failureCause();
} }
@Override
public void addListener(Listener listener, Executor executor) {
this.delegator.addListener(listener, executor);
}
} }

View File

@ -249,6 +249,16 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
notifyAll(); notifyAll();
} }
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override @Override
protected void doStart() { protected void doStart() {
notifyStarted(); notifyStarted();

View File

@ -395,6 +395,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
return true; return true;
} }
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override @Override
protected void doStart() { protected void doStart() {
startedCount.incrementAndGet(); startedCount.incrementAndGet();

View File

@ -158,7 +158,7 @@ public class TestReplicationSource {
// completes // completes
} }
}; };
replicationEndpoint.startAsync(); replicationEndpoint.start();
ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);