HBASE-11920 Add CP hooks for ReplicationEndPoint

This commit is contained in:
Ramkrishna 2014-09-25 22:11:28 +05:30
parent a2e05b9f8f
commit 44a27c5cd7
6 changed files with 94 additions and 22 deletions

View File

@ -19,23 +19,23 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import java.io.IOException;
import java.util.List;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public abstract class BaseMasterAndRegionObserver extends BaseRegionObserver

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
/**
* An abstract class that implements RegionServerObserver.
@ -76,4 +77,10 @@ public class BaseRegionServerObserver implements RegionServerObserver {
public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException { }
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return endpoint;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
public interface RegionServerObserver extends Coprocessor {
@ -121,4 +122,13 @@ public interface RegionServerObserver extends Coprocessor {
void postRollWALWriterRequest(final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException;
/**
* This will be called after the replication endpoint is instantiated.
* @param ctx
* @param endpoint - the base endpoint for replication
* @return the endpoint to use during replication.
*/
ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
@ -156,6 +157,27 @@ public class RegionServerCoprocessorHost extends
});
}
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
: new CoprocessOperationWithResult<ReplicationEndpoint>() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
}
});
}
private <T> T execOperationWithResult(final T defaultValue,
final CoprocessOperationWithResult<T> ctx) throws IOException {
if (ctx == null)
return defaultValue;
ctx.setResult(defaultValue);
execOperation(ctx);
return ctx.getResult();
}
private static abstract class CoprocessorOperation
extends ObserverContext<RegionServerCoprocessorEnvironment> {
public CoprocessorOperation() {
@ -168,6 +190,18 @@ public class RegionServerCoprocessorHost extends
}
}
private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
private T result = null;
public void setResult(final T result) {
this.result = result;
}
public T getResult() {
return this.result;
}
}
private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
if (ctx == null) return false;

View File

@ -39,11 +39,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
@ -84,7 +86,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// UUID for this cluster
private final UUID clusterId;
// All about stopping
private final Stoppable stopper;
private final Server server;
// All logs we are currently tracking
private final Map<String, SortedSet<String>> hlogsById;
// Logs for recovered sources we are currently tracking
@ -111,7 +113,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
* @param stopper the stopper object for this region server
* @param server the server for this region server
* @param fs the file system to use
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
@ -119,7 +121,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
@ -127,7 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.stopper = stopper;
this.server = server;
this.hlogsById = new HashMap<String, SortedSet<String>>();
this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@ -243,7 +245,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>());
@ -257,7 +259,7 @@ public class ReplicationSourceManager implements ReplicationListener {
String message =
"Cannot add log to queue when creating a new source, queueId="
+ src.getPeerClusterZnode() + ", filename=" + name;
stopper.stop(message);
server.stop(message);
throw e;
}
src.enqueueLog(this.latestPath);
@ -359,7 +361,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param stopper the stopper object for this region server
* @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
@ -367,9 +369,13 @@ public class ReplicationSourceManager implements ReplicationListener {
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Stoppable stopper, final String peerId, final UUID clusterId,
final Server server, final String peerId, final UUID clusterId,
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
@ -392,6 +398,14 @@ public class ReplicationSourceManager implements ReplicationListener {
@SuppressWarnings("rawtypes")
Class c = Class.forName(replicationEndpointImpl);
replicationEndpoint = (ReplicationEndpoint) c.newInstance();
if(rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = rsServerHost
.postCreateReplicationEndPoint(replicationEndpoint);
if(newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
} catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors", e);
throw new IOException(e);
@ -399,7 +413,7 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
clusterId, replicationEndpoint, metrics);
// init replication endpoint
@ -542,7 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener {
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (stopper.isStopped()) {
if (server.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
@ -578,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
stopper, peerId, this.clusterId, peerConfig, peer);
server, peerId, this.clusterId, peerConfig, peer);
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;

View File

@ -30,7 +30,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -45,13 +45,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@ -2252,4 +2253,10 @@ public class AccessController extends BaseMasterAndRegionObserver
final String namespace, final Quotas quotas) throws IOException {
requirePermission("setNamespaceQuota", Action.ADMIN);
}
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return endpoint;
}
}