HBASE-4131 Make the Replication Service pluggable via a standard interface definition

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1174963 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-09-23 19:18:41 +00:00
parent abed8ff130
commit a170f9ee4e
7 changed files with 275 additions and 30 deletions

View File

@ -3,6 +3,8 @@ Release 0.93.0 - Unreleased
IMPROVEMENT
HBASE-4132 Extend the WALActionsListener API to accomodate log archival
(dhruba borthakur)
HBASE-4131 Make the Replication Service pluggable via a standard
interface definition (dhruba borthakur)
Release 0.92.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -473,8 +473,17 @@ public final class HConstants {
*/
public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
/*
* cluster replication constants.
*/
public static final String
REPLICATION_ENABLE_KEY = "hbase.replication";
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
/** HBCK special code name used as server name when manipulating ZK nodes */
public static final String HBCK_CODE_NAME = "HBCKServerName";

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -284,7 +285,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private ExecutorService service;
// Replication services. If no replication, this handler will be null.
private Replication replicationHandler;
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
private final RegionServerAccounting regionServerAccounting;
@ -1169,12 +1171,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
try {
this.replicationHandler = Replication.isReplication(this.conf)?
new Replication(this, this.fs, logdir, oldLogDir): null;
} catch (KeeperException e) {
throw new IOException("Failed replication handler create", e);
}
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
return instantiateHLog(logdir, oldLogDir);
}
@ -1201,9 +1198,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Log roller.
this.hlogRoller = new LogRoller(this, this);
listeners.add(this.hlogRoller);
if (this.replicationHandler != null) {
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
// Replication handler is an implementation of WALActionsListener.
listeners.add(this.replicationHandler);
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}
return listeners;
}
@ -1351,8 +1349,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// that port is occupied. Adjust serverInfo if this is the case.
this.webuiport = putUpWebUI();
if (this.replicationHandler != null) {
this.replicationHandler.startReplicationServices();
if (this.replicationSourceHandler == this.replicationSinkHandler &&
this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.startReplicationService();
}
// Start Server. This service is like leases in that it internally runs
@ -1557,11 +1560,32 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.compactSplitThread.join();
}
if (this.service != null) this.service.shutdown();
if (this.replicationHandler != null) {
this.replicationHandler.join();
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler == this.replicationSinkHandler) {
this.replicationSourceHandler.stopReplicationService();
} else if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.stopReplicationService();
} else if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.stopReplicationService();
}
}
/**
* @return Return the object that implements the replication
* source service.
*/
ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
/**
* @return Return the object that implements the replication
* sink service.
*/
ReplicationSinkService getReplicationSinkService() {
return replicationSinkHandler;
}
/**
* Get the current master from ZooKeeper and open the RPC connection to it.
*
@ -3063,6 +3087,63 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Main program and support routines
//
/**
* Load the replication service objects, if any
*/
static private void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
// If replication is not enabled, then return immediately.
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
return;
}
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSinkHandler;
}
else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
conf, server, fs, logDir, oldLogDir);
}
}
static private ReplicationService newReplicationInstance(String classname,
Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
Path oldLogDir) throws IOException{
Class<?> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Cound not find class for " + classname);
}
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, fs, logDir, oldLogDir);
return service;
}
/**
* @param hrs
* @return Thread the RegionServer is running in correctly named.
@ -3115,8 +3196,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public void replicateLogEntries(final HLog.Entry[] entries)
throws IOException {
checkOpen();
if (this.replicationHandler == null) return;
this.replicationHandler.replicateLogEntries(entries);
if (this.replicationSinkHandler == null) return;
this.replicationSinkHandler.replicateLogEntries(entries);
}
/**

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Gateway to Cluster Replication.
* Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* One such application is a cross-datacenter
* replication service that can keep two hbase clusters in sync.
*/
public interface ReplicationService {
/**
* Initializes the replication service object.
* @throws IOException
*/
public void initialize(Server rs, FileSystem fs, Path logdir,
Path oldLogDir) throws IOException;
/**
* Start replication services.
* @throws IOException
*/
public void startReplicationService() throws IOException;
/**
* Stops replication service.
*/
public void stopReplicationService();
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* A sink for a replication stream has to expose this service.
* This service allows an application to hook into the
* regionserver and behave as a replication sink.
*/
public interface ReplicationSinkService extends ReplicationService {
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
/**
* A source for a replication stream has to expose this service.
* This service allows an application to hook into the
* regionserver and watch for new transactions.
*/
public interface ReplicationSourceService extends ReplicationService {
/**
* Returns a WALObserver for the service. This is needed to
* observe log rolls and log archival events.
*/
public WALActionsListener getWALActionsListener();
}

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -47,15 +49,16 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
public class Replication implements WALActionsListener {
private final boolean replication;
private final ReplicationSourceManager replicationManager;
public class Replication implements WALActionsListener,
ReplicationSourceService, ReplicationSinkService {
private boolean replication;
private ReplicationSourceManager replicationManager;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private final ReplicationZookeeper zkHelper;
private final Configuration conf;
private ReplicationZookeeper zkHelper;
private Configuration conf;
private ReplicationSink replicationSink;
// Hosting server
private final Server server;
private Server server;
/**
* Instantiate the replication management (if rep is enabled).
@ -64,16 +67,29 @@ public class Replication implements WALActionsListener {
* @param logDir
* @param oldLogDir directory where logs are archived
* @throws IOException
* @throws KeeperException
*/
public Replication(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir)
throws IOException, KeeperException {
final Path logDir, final Path oldLogDir) throws IOException{
initialize(server, fs, logDir, oldLogDir);
}
/**
* Empty constructor
*/
public Replication() {
}
public void initialize(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
if (replication) {
try {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
} catch (KeeperException ke) {
throw new IOException("Failed replication handler create", ke);
}
this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
this.server, fs, this.replicating, logDir, oldLogDir) ;
} else {
@ -90,6 +106,19 @@ public class Replication implements WALActionsListener {
return c.getBoolean(REPLICATION_ENABLE_KEY, false);
}
/*
* Returns an object to listen to new hlog changes
**/
public WALActionsListener getWALActionsListener() {
return this;
}
/**
* Stops replication service.
*/
public void stopReplicationService() {
join();
}
/**
* Join with the replication threads
*/
@ -115,7 +144,7 @@ public class Replication implements WALActionsListener {
* it starts
* @throws IOException
*/
public void startReplicationServices() throws IOException {
public void startReplicationService() throws IOException {
if (this.replication) {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);