HBASE-15982 Interface ReplicationEndpoint extends Guava's Service
Breaking change to our ReplicationEndpoint and BaseReplicationEndpoint. ReplicationEndpoint implemented Guava 0.12 Service. An abstract subclass, BaseReplicationEndpoint, provided default implementations and facility, among other things, by extending Guava AbstractService class. Both of these HBase classes were marked LimitedPrivate for REPLICATION so these classes were semi-public and made it so Guava 0.12 was part of our API. Having Guava in our API was a mistake. It anchors us and the implementation of the Interface to Guava 0.12. This is untenable given Guava changes and that the Service Interface in particular has had extensive revamp and improvement done. We can't hold to the Guava Interface. It changed. We can't stay on Guava 0.12; implementors and others on our CLASSPATH won't abide being stuck on an old Guava. So this class makes breaking changes. The unhitching of our Interface from Guava could only be done in a breaking manner. It undoes the LimitedPrivate on BaseReplicationEndpoint while keeping it for the RE Interface. It means consumers will have to copy/paste the AbstractService-based BRE into their own codebase also supplying their own Guava; HBase no longer 'supplies' this (our Guava usage has been internalized, relocated). This patch then adds into RE the basic methods RE needs of the old Guava Service rather than return a Service to start/stop only to go back to the RE instance to do actual work. A few method names had to be changed so could make implementations with Guava Service internally and not have RE method names and types clash). Semantics remained the same otherwise. For example startAsync and stopAsync in Guava are start and stop in RE.
This commit is contained in:
parent
d12eb7a4aa
commit
6e7baa07f0
|
@ -24,15 +24,16 @@ import java.util.ArrayList;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService;
|
||||
|
||||
/**
|
||||
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
|
||||
* class rather than implementing {@link ReplicationEndpoint} directly for better backwards
|
||||
* compatibility.
|
||||
* A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal
|
||||
* Guava.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
// This class has been made InterfaceAudience.Private in 2.0.0. It used to be
|
||||
// LimitedPrivate. See HBASE-15982.
|
||||
@InterfaceAudience.Private
|
||||
public abstract class BaseReplicationEndpoint extends AbstractService
|
||||
implements ReplicationEndpoint {
|
||||
|
||||
|
@ -109,4 +110,9 @@ public abstract class BaseReplicationEndpoint extends AbstractService
|
|||
public boolean canReplicateToSameCluster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStarting() {
|
||||
return state() == State.STARTING;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,6 +78,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
try {
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -31,8 +33,6 @@ import org.apache.hadoop.hbase.TableDescriptors;
|
|||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
|
||||
|
||||
/**
|
||||
* ReplicationEndpoint is a plugin which implements replication
|
||||
* to other HBase clusters, or other systems. ReplicationEndpoint implementation
|
||||
|
@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
|
|||
* and persisting of the WAL entries in the other cluster.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener {
|
||||
public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
|
||||
// TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
|
||||
// How they relate? Do we #start before #init(Context)? We fail fast if you don't?
|
||||
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
class Context {
|
||||
|
@ -176,4 +178,82 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
|
|||
* parameters can be obtained.
|
||||
*/
|
||||
boolean replicate(ReplicateContext replicateContext);
|
||||
}
|
||||
|
||||
|
||||
// The below methods are inspired by Guava Service. See
|
||||
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
|
||||
// Below we implement a subset only with different names on some methods so we can implement
|
||||
// the below internally using Guava (without exposing our implementation to
|
||||
// ReplicationEndpoint implementors.
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this service is RUNNING.
|
||||
*/
|
||||
boolean isRunning();
|
||||
|
||||
/**
|
||||
* @return Return {@code true} is this service is STARTING (but not yet RUNNING).
|
||||
*/
|
||||
boolean isStarting();
|
||||
|
||||
/**
|
||||
* Initiates service startup and returns immediately. A stopped service may not be restarted.
|
||||
* Equivalent of startAsync call in Guava Service.
|
||||
* @throws IllegalStateException if the service is not new, if it has been run already.
|
||||
*/
|
||||
void start();
|
||||
|
||||
/**
|
||||
* Waits for the {@link ReplicationEndpoint} to be up and running.
|
||||
*
|
||||
* @throws IllegalStateException if the service reaches a state from which it is not possible to
|
||||
* enter the (internal) running state. e.g. if the state is terminated when this method is
|
||||
* called then this will throw an IllegalStateException.
|
||||
*/
|
||||
void awaitRunning();
|
||||
|
||||
/**
|
||||
* Waits for the {@link ReplicationEndpoint} to to be up and running for no more
|
||||
* than the given time.
|
||||
*
|
||||
* @param timeout the maximum time to wait
|
||||
* @param unit the time unit of the timeout argument
|
||||
* @throws TimeoutException if the service has not reached the given state within the deadline
|
||||
* @throws IllegalStateException if the service reaches a state from which it is not possible to
|
||||
* enter the (internal) running state. e.g. if the state is terminated when this method is
|
||||
* called then this will throw an IllegalStateException.
|
||||
*/
|
||||
void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
|
||||
|
||||
/**
|
||||
* If the service is starting or running, this initiates service shutdown and returns immediately.
|
||||
* If the service has already been stopped, this method returns immediately without taking action.
|
||||
* Equivalent of stopAsync call in Guava Service.
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
|
||||
*
|
||||
* @throws IllegalStateException if the service FAILED.
|
||||
*/
|
||||
void awaitTerminated();
|
||||
|
||||
/**
|
||||
* Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
|
||||
* more than the given time.
|
||||
*
|
||||
* @param timeout the maximum time to wait
|
||||
* @param unit the time unit of the timeout argument
|
||||
* @throws TimeoutException if the service has not reached the given state within the deadline
|
||||
* @throws IllegalStateException if the service FAILED.
|
||||
*/
|
||||
void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
|
||||
|
||||
/**
|
||||
* Returns the {@link Throwable} that caused this service to fail.
|
||||
*
|
||||
* @throws IllegalStateException if this service's state isn't FAILED.
|
||||
*/
|
||||
Throwable failureCause();
|
||||
}
|
|
@ -19,8 +19,6 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -130,6 +128,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
|
||||
private AtomicLong totalBufferUsed;
|
||||
|
||||
public static final String WAIT_ON_ENDPOINT_SECONDS =
|
||||
"hbase.replication.wait.on.endpoint.seconds";
|
||||
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
|
||||
private int waitOnEndpointSeconds = -1;
|
||||
|
||||
/**
|
||||
* Instantiation method used by region servers
|
||||
*
|
||||
|
@ -152,6 +155,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
throws IOException {
|
||||
this.stopper = stopper;
|
||||
this.conf = HBaseConfiguration.create(conf);
|
||||
this.waitOnEndpointSeconds =
|
||||
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
|
||||
decorateConf();
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
|
||||
|
@ -245,17 +250,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
this.sourceRunning = true;
|
||||
try {
|
||||
// start the endpoint, connect to the cluster
|
||||
Service service = replicationEndpoint.startAsync();
|
||||
final int waitTime = 10;
|
||||
service.awaitRunning(waitTime, TimeUnit.SECONDS);
|
||||
if (!service.isRunning()) {
|
||||
LOG.warn("ReplicationEndpoint was not started after waiting " + waitTime +
|
||||
" + seconds. Exiting");
|
||||
uninitialize();
|
||||
return;
|
||||
}
|
||||
this.replicationEndpoint.start();
|
||||
this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
|
||||
uninitialize();
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
|
@ -383,14 +382,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
private void uninitialize() {
|
||||
LOG.debug("Source exiting " + this.peerId);
|
||||
metrics.clear();
|
||||
if (replicationEndpoint.state() == Service.State.STARTING
|
||||
|| replicationEndpoint.state() == Service.State.RUNNING) {
|
||||
replicationEndpoint.stopAsync();
|
||||
final int waitTime = 10;
|
||||
if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
|
||||
this.replicationEndpoint.stop();
|
||||
try {
|
||||
replicationEndpoint.awaitTerminated(waitTime, TimeUnit.SECONDS);
|
||||
this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("Failed termination after " + waitTime + " seconds.");
|
||||
LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -463,18 +460,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
worker.entryReader.interrupt();
|
||||
worker.interrupt();
|
||||
}
|
||||
Service service = null;
|
||||
if (this.replicationEndpoint != null) {
|
||||
service = this.replicationEndpoint.stopAsync();
|
||||
this.replicationEndpoint.stop();
|
||||
}
|
||||
if (join) {
|
||||
for (ReplicationSourceShipper worker : workers) {
|
||||
Threads.shutdown(worker, this.sleepForRetries);
|
||||
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
|
||||
}
|
||||
if (service != null) {
|
||||
if (this.replicationEndpoint != null) {
|
||||
try {
|
||||
service.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
|
||||
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
|
||||
} catch (TimeoutException te) {
|
||||
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
|
||||
+ this.peerClusterZnode,
|
||||
|
|
|
@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
|
|||
public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class);
|
||||
private ReplicationEndpoint delegator;
|
||||
private VisibilityLabelService visibilityLabelsService;
|
||||
|
||||
private final ReplicationEndpoint delegator;
|
||||
private final VisibilityLabelService visibilityLabelsService;
|
||||
|
||||
public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
|
||||
VisibilityLabelService visibilityLabelsService) {
|
||||
|
@ -62,7 +63,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
|
|||
|
||||
@Override
|
||||
public void peerConfigUpdated(ReplicationPeerConfig rpc){
|
||||
|
||||
delegator.peerConfigUpdated(rpc);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,24 +138,17 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
|
|||
return delegator.getWALEntryfilter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Service startAsync() {
|
||||
return this.delegator.startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return delegator.isRunning();
|
||||
return this.delegator.isRunning();
|
||||
}
|
||||
|
||||
@Override
|
||||
public State state() {
|
||||
return delegator.state();
|
||||
}
|
||||
public boolean isStarting() {return this.delegator.isStarting();}
|
||||
|
||||
@Override
|
||||
public Service stopAsync() {
|
||||
return this.delegator.stopAsync();
|
||||
public void start() {
|
||||
this.delegator.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,8 +157,13 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void awaitRunning(long l, TimeUnit timeUnit) throws TimeoutException {
|
||||
this.delegator.awaitRunning(l, timeUnit);
|
||||
public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
|
||||
this.delegator.awaitRunning(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
this.delegator.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,17 +172,12 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void awaitTerminated(long l, TimeUnit timeUnit) throws TimeoutException {
|
||||
this.delegator.awaitTerminated(l, timeUnit);
|
||||
public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
|
||||
this.delegator.awaitTerminated(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable failureCause() {
|
||||
return this.delegator.failureCause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener, Executor executor) {
|
||||
this.delegator.addListener(listener, executor);
|
||||
}
|
||||
}
|
|
@ -249,6 +249,16 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
|||
notifyAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
notifyStarted();
|
||||
|
|
|
@ -395,6 +395,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
startedCount.incrementAndGet();
|
||||
|
|
|
@ -158,7 +158,7 @@ public class TestReplicationSource {
|
|||
// completes
|
||||
}
|
||||
};
|
||||
replicationEndpoint.startAsync();
|
||||
replicationEndpoint.start();
|
||||
ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
|
||||
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
|
||||
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||
|
|
Loading…
Reference in New Issue