HBASE-4131 Make the Replication Service pluggable via a standard interface definition; BACKED IT OUT -- WAS CAUSING TestReplication failures

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1175048 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-09-23 22:09:01 +00:00
parent 2345d9298e
commit 97e337f330
7 changed files with 30 additions and 275 deletions

View File

@ -3,8 +3,6 @@ Release 0.93.0 - Unreleased
IMPROVEMENT IMPROVEMENT
HBASE-4132 Extend the WALActionsListener API to accomodate log archival HBASE-4132 Extend the WALActionsListener API to accomodate log archival
(dhruba borthakur) (dhruba borthakur)
HBASE-4131 Make the Replication Service pluggable via a standard
interface definition (dhruba borthakur)
Release 0.92.0 - Unreleased Release 0.92.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -473,17 +473,8 @@ public final class HConstants {
*/ */
public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000; public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
/*
* cluster replication constants.
*/
public static final String public static final String
REPLICATION_ENABLE_KEY = "hbase.replication"; REPLICATION_ENABLE_KEY = "hbase.replication";
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
/** HBCK special code name used as server name when manipulating ZK nodes */ /** HBCK special code name used as server name when manipulating ZK nodes */
public static final String HBCK_CODE_NAME = "HBCKServerName"; public static final String HBCK_CODE_NAME = "HBCKServerName";

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
@ -285,8 +284,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private ExecutorService service; private ExecutorService service;
// Replication services. If no replication, this handler will be null. // Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler; private Replication replicationHandler;
private ReplicationSinkService replicationSinkHandler;
private final RegionServerAccounting regionServerAccounting; private final RegionServerAccounting regionServerAccounting;
@ -1171,7 +1169,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Instantiate replication manager if replication enabled. Pass it the // Instantiate replication manager if replication enabled. Pass it the
// log directories. // log directories.
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); try {
this.replicationHandler = Replication.isReplication(this.conf)?
new Replication(this, this.fs, logdir, oldLogDir): null;
} catch (KeeperException e) {
throw new IOException("Failed replication handler create", e);
}
return instantiateHLog(logdir, oldLogDir); return instantiateHLog(logdir, oldLogDir);
} }
@ -1198,10 +1201,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Log roller. // Log roller.
this.hlogRoller = new LogRoller(this, this); this.hlogRoller = new LogRoller(this, this);
listeners.add(this.hlogRoller); listeners.add(this.hlogRoller);
if (this.replicationSourceHandler != null && if (this.replicationHandler != null) {
this.replicationSourceHandler.getWALActionsListener() != null) {
// Replication handler is an implementation of WALActionsListener. // Replication handler is an implementation of WALActionsListener.
listeners.add(this.replicationSourceHandler.getWALActionsListener()); listeners.add(this.replicationHandler);
} }
return listeners; return listeners;
} }
@ -1349,13 +1351,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// that port is occupied. Adjust serverInfo if this is the case. // that port is occupied. Adjust serverInfo if this is the case.
this.webuiport = putUpWebUI(); this.webuiport = putUpWebUI();
if (this.replicationSourceHandler == this.replicationSinkHandler && if (this.replicationHandler != null) {
this.replicationSourceHandler != null) { this.replicationHandler.startReplicationServices();
this.replicationSourceHandler.startReplicationService();
} else if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.startReplicationService();
} }
// Start Server. This service is like leases in that it internally runs // Start Server. This service is like leases in that it internally runs
@ -1560,32 +1557,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.compactSplitThread.join(); this.compactSplitThread.join();
} }
if (this.service != null) this.service.shutdown(); if (this.service != null) this.service.shutdown();
if (this.replicationSourceHandler != null && if (this.replicationHandler != null) {
this.replicationSourceHandler == this.replicationSinkHandler) { this.replicationHandler.join();
this.replicationSourceHandler.stopReplicationService();
} else if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.stopReplicationService();
} else if (this.replicationSinkHandler != null) {
this.replicationSinkHandler.stopReplicationService();
} }
} }
/**
* @return Return the object that implements the replication
* source service.
*/
ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
/**
* @return Return the object that implements the replication
* sink service.
*/
ReplicationSinkService getReplicationSinkService() {
return replicationSinkHandler;
}
/** /**
* Get the current master from ZooKeeper and open the RPC connection to it. * Get the current master from ZooKeeper and open the RPC connection to it.
* *
@ -3087,63 +3063,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Main program and support routines // Main program and support routines
// //
/**
* Load the replication service objects, if any
*/
static private void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
// If replication is not enabled, then return immediately.
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
return;
}
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSinkHandler;
}
else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
conf, server, fs, logDir, oldLogDir);
}
}
static private ReplicationService newReplicationInstance(String classname,
Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
Path oldLogDir) throws IOException{
Class<?> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Cound not find class for " + classname);
}
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, fs, logDir, oldLogDir);
return service;
}
/** /**
* @param hrs * @param hrs
* @return Thread the RegionServer is running in correctly named. * @return Thread the RegionServer is running in correctly named.
@ -3196,8 +3115,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public void replicateLogEntries(final HLog.Entry[] entries) public void replicateLogEntries(final HLog.Entry[] entries)
throws IOException { throws IOException {
checkOpen(); checkOpen();
if (this.replicationSinkHandler == null) return; if (this.replicationHandler == null) return;
this.replicationSinkHandler.replicateLogEntries(entries); this.replicationHandler.replicateLogEntries(entries);
} }
/** /**

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Gateway to Cluster Replication.
* Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* One such application is a cross-datacenter
* replication service that can keep two hbase clusters in sync.
*/
public interface ReplicationService {
/**
* Initializes the replication service object.
* @throws IOException
*/
public void initialize(Server rs, FileSystem fs, Path logdir,
Path oldLogDir) throws IOException;
/**
* Start replication services.
* @throws IOException
*/
public void startReplicationService() throws IOException;
/**
* Stops replication service.
*/
public void stopReplicationService();
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* A sink for a replication stream has to expose this service.
* This service allows an application to hook into the
* regionserver and behave as a replication sink.
*/
public interface ReplicationSinkService extends ReplicationService {
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
/**
* A source for a replication stream has to expose this service.
* This service allows an application to hook into the
* regionserver and watch for new transactions.
*/
public interface ReplicationSourceService extends ReplicationService {
/**
* Returns a WALObserver for the service. This is needed to
* observe log rolls and log archival events.
*/
public WALActionsListener getWALActionsListener();
}

View File

@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -49,16 +47,15 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
/** /**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/ */
public class Replication implements WALActionsListener, public class Replication implements WALActionsListener {
ReplicationSourceService, ReplicationSinkService { private final boolean replication;
private boolean replication; private final ReplicationSourceManager replicationManager;
private ReplicationSourceManager replicationManager;
private final AtomicBoolean replicating = new AtomicBoolean(true); private final AtomicBoolean replicating = new AtomicBoolean(true);
private ReplicationZookeeper zkHelper; private final ReplicationZookeeper zkHelper;
private Configuration conf; private final Configuration conf;
private ReplicationSink replicationSink; private ReplicationSink replicationSink;
// Hosting server // Hosting server
private Server server; private final Server server;
/** /**
* Instantiate the replication management (if rep is enabled). * Instantiate the replication management (if rep is enabled).
@ -67,29 +64,16 @@ public class Replication implements WALActionsListener,
* @param logDir * @param logDir
* @param oldLogDir directory where logs are archived * @param oldLogDir directory where logs are archived
* @throws IOException * @throws IOException
* @throws KeeperException
*/ */
public Replication(final Server server, final FileSystem fs, public Replication(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException{ final Path logDir, final Path oldLogDir)
initialize(server, fs, logDir, oldLogDir); throws IOException, KeeperException {
}
/**
* Empty constructor
*/
public Replication() {
}
public void initialize(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException {
this.server = server; this.server = server;
this.conf = this.server.getConfiguration(); this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf); this.replication = isReplication(this.conf);
if (replication) { if (replication) {
try { this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
} catch (KeeperException ke) {
throw new IOException("Failed replication handler create", ke);
}
this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
this.server, fs, this.replicating, logDir, oldLogDir) ; this.server, fs, this.replicating, logDir, oldLogDir) ;
} else { } else {
@ -98,27 +82,14 @@ public class Replication implements WALActionsListener,
} }
} }
/** /**
* @param c Configuration to look at * @param c Configuration to look at
* @return True if replication is enabled. * @return True if replication is enabled.
*/ */
public static boolean isReplication(final Configuration c) { public static boolean isReplication(final Configuration c) {
return c.getBoolean(REPLICATION_ENABLE_KEY, false); return c.getBoolean(REPLICATION_ENABLE_KEY, false);
} }
/*
* Returns an object to listen to new hlog changes
**/
public WALActionsListener getWALActionsListener() {
return this;
}
/**
* Stops replication service.
*/
public void stopReplicationService() {
join();
}
/** /**
* Join with the replication threads * Join with the replication threads
*/ */
@ -144,7 +115,7 @@ public class Replication implements WALActionsListener,
* it starts * it starts
* @throws IOException * @throws IOException
*/ */
public void startReplicationService() throws IOException { public void startReplicationServices() throws IOException {
if (this.replication) { if (this.replication) {
this.replicationManager.init(); this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server); this.replicationSink = new ReplicationSink(this.conf, this.server);