From 97e337f3309dd4d41753a2419ab97a667765ddda Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 23 Sep 2011 22:09:01 +0000 Subject: [PATCH] HBASE-4131 Make the Replication Service pluggable via a standard interface definition; BACKED IT OUT -- WAS CAUSING TestReplication failures git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1175048 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 - .../org/apache/hadoop/hbase/HConstants.java | 9 -- .../hbase/regionserver/HRegionServer.java | 111 +++--------------- .../regionserver/ReplicationService.java | 51 -------- .../regionserver/ReplicationSinkService.java | 37 ------ .../ReplicationSourceService.java | 36 ------ .../replication/regionserver/Replication.java | 59 +++------- 7 files changed, 30 insertions(+), 275 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b3317d0767c..00c40a3084b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,8 +3,6 @@ Release 0.93.0 - Unreleased IMPROVEMENT HBASE-4132 Extend the WALActionsListener API to accomodate log archival (dhruba borthakur) - HBASE-4131 Make the Replication Service pluggable via a standard - interface definition (dhruba borthakur) Release 0.92.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3af1bf03038..7ea64b7e778 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -473,17 +473,8 @@ public final class HConstants { */ public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000; - /* - * cluster replication constants. - */ public static final String REPLICATION_ENABLE_KEY = "hbase.replication"; - public static final String - REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; - public static final String - REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; - public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = - "org.apache.hadoop.hbase.replication.regionserver.Replication"; /** HBCK special code name used as server name when manipulating ZK nodes */ public static final String HBCK_CODE_NAME = "HBCKServerName"; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9f1fd8fe2a6..5869c18412d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -54,7 +54,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -285,8 +284,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private ExecutorService service; // Replication services. If no replication, this handler will be null. - private ReplicationSourceService replicationSourceHandler; - private ReplicationSinkService replicationSinkHandler; + private Replication replicationHandler; private final RegionServerAccounting regionServerAccounting; @@ -1171,7 +1169,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Instantiate replication manager if replication enabled. Pass it the // log directories. - createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); + try { + this.replicationHandler = Replication.isReplication(this.conf)? + new Replication(this, this.fs, logdir, oldLogDir): null; + } catch (KeeperException e) { + throw new IOException("Failed replication handler create", e); + } return instantiateHLog(logdir, oldLogDir); } @@ -1198,10 +1201,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Log roller. this.hlogRoller = new LogRoller(this, this); listeners.add(this.hlogRoller); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler.getWALActionsListener() != null) { + if (this.replicationHandler != null) { // Replication handler is an implementation of WALActionsListener. - listeners.add(this.replicationSourceHandler.getWALActionsListener()); + listeners.add(this.replicationHandler); } return listeners; } @@ -1349,13 +1351,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // that port is occupied. Adjust serverInfo if this is the case. this.webuiport = putUpWebUI(); - if (this.replicationSourceHandler == this.replicationSinkHandler && - this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } else if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } else if (this.replicationSinkHandler != null) { - this.replicationSinkHandler.startReplicationService(); + if (this.replicationHandler != null) { + this.replicationHandler.startReplicationServices(); } // Start Server. This service is like leases in that it internally runs @@ -1560,32 +1557,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.compactSplitThread.join(); } if (this.service != null) this.service.shutdown(); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler == this.replicationSinkHandler) { - this.replicationSourceHandler.stopReplicationService(); - } else if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.stopReplicationService(); - } else if (this.replicationSinkHandler != null) { - this.replicationSinkHandler.stopReplicationService(); + if (this.replicationHandler != null) { + this.replicationHandler.join(); } } - /** - * @return Return the object that implements the replication - * source service. - */ - ReplicationSourceService getReplicationSourceService() { - return replicationSourceHandler; - } - - /** - * @return Return the object that implements the replication - * sink service. - */ - ReplicationSinkService getReplicationSinkService() { - return replicationSinkHandler; - } - /** * Get the current master from ZooKeeper and open the RPC connection to it. * @@ -3087,63 +3063,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Main program and support routines // - /** - * Load the replication service objects, if any - */ - static private void createNewReplicationInstance(Configuration conf, - HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{ - - // If replication is not enabled, then return immediately. - if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { - return; - } - - // read in the name of the source replication class from the config file. - String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); - - // read in the name of the sink replication class from the config file. - String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); - - // If both the sink and the source class names are the same, then instantiate - // only one object. - if (sourceClassname.equals(sinkClassname)) { - server.replicationSourceHandler = (ReplicationSourceService) - newReplicationInstance(sourceClassname, - conf, server, fs, logDir, oldLogDir); - server.replicationSinkHandler = (ReplicationSinkService) - server.replicationSinkHandler; - } - else { - server.replicationSourceHandler = (ReplicationSourceService) - newReplicationInstance(sourceClassname, - conf, server, fs, logDir, oldLogDir); - server.replicationSinkHandler = (ReplicationSinkService) - newReplicationInstance(sinkClassname, - conf, server, fs, logDir, oldLogDir); - } - } - - static private ReplicationService newReplicationInstance(String classname, - Configuration conf, HRegionServer server, FileSystem fs, Path logDir, - Path oldLogDir) throws IOException{ - - Class clazz = null; - try { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - clazz = Class.forName(classname, true, classLoader); - } catch (java.lang.ClassNotFoundException nfe) { - throw new IOException("Cound not find class for " + classname); - } - - // create an instance of the replication object. - ReplicationService service = (ReplicationService) - ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, fs, logDir, oldLogDir); - return service; - } - /** * @param hrs * @return Thread the RegionServer is running in correctly named. @@ -3196,8 +3115,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void replicateLogEntries(final HLog.Entry[] entries) throws IOException { checkOpen(); - if (this.replicationSinkHandler == null) return; - this.replicationSinkHandler.replicateLogEntries(entries); + if (this.replicationHandler == null) return; + this.replicationHandler.replicateLogEntries(entries); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index 489e1410dfb..e69de29bb2d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; - -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -/** - * Gateway to Cluster Replication. - * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. - * One such application is a cross-datacenter - * replication service that can keep two hbase clusters in sync. - */ -public interface ReplicationService { - - /** - * Initializes the replication service object. - * @throws IOException - */ - public void initialize(Server rs, FileSystem fs, Path logdir, - Path oldLogDir) throws IOException; - - /** - * Start replication services. - * @throws IOException - */ - public void startReplicationService() throws IOException; - - /** - * Stops replication service. - */ - public void stopReplicationService(); -} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java index a5706731e80..e69de29bb2d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; - -import org.apache.hadoop.hbase.regionserver.wal.HLog; - -/** - * A sink for a replication stream has to expose this service. - * This service allows an application to hook into the - * regionserver and behave as a replication sink. - */ -public interface ReplicationSinkService extends ReplicationService { - - /** - * Carry on the list of log entries down to the sink - * @param entries list of entries to replicate - * @throws IOException - */ - public void replicateLogEntries(HLog.Entry[] entries) throws IOException; -} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index af0a7d887e2..e69de29bb2d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; - -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; - -/** - * A source for a replication stream has to expose this service. - * This service allows an application to hook into the - * regionserver and watch for new transactions. - */ -public interface ReplicationSourceService extends ReplicationService { - - /** - * Returns a WALObserver for the service. This is needed to - * observe log rolls and log archival events. - */ - public WALActionsListener getWALActionsListener(); -} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9d6b9ccefb7..0ce77640066 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -49,16 +47,15 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ -public class Replication implements WALActionsListener, - ReplicationSourceService, ReplicationSinkService { - private boolean replication; - private ReplicationSourceManager replicationManager; +public class Replication implements WALActionsListener { + private final boolean replication; + private final ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); - private ReplicationZookeeper zkHelper; - private Configuration conf; + private final ReplicationZookeeper zkHelper; + private final Configuration conf; private ReplicationSink replicationSink; // Hosting server - private Server server; + private final Server server; /** * Instantiate the replication management (if rep is enabled). @@ -67,29 +64,16 @@ public class Replication implements WALActionsListener, * @param logDir * @param oldLogDir directory where logs are archived * @throws IOException + * @throws KeeperException */ public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException{ - initialize(server, fs, logDir, oldLogDir); - } - - /** - * Empty constructor - */ - public Replication() { - } - - public void initialize(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException { + final Path logDir, final Path oldLogDir) + throws IOException, KeeperException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); if (replication) { - try { - this.zkHelper = new ReplicationZookeeper(server, this.replicating); - } catch (KeeperException ke) { - throw new IOException("Failed replication handler create", ke); - } + this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; } else { @@ -98,27 +82,14 @@ public class Replication implements WALActionsListener, } } - /** - * @param c Configuration to look at - * @return True if replication is enabled. - */ + /** + * @param c Configuration to look at + * @return True if replication is enabled. + */ public static boolean isReplication(final Configuration c) { return c.getBoolean(REPLICATION_ENABLE_KEY, false); } - /* - * Returns an object to listen to new hlog changes - **/ - public WALActionsListener getWALActionsListener() { - return this; - } - /** - * Stops replication service. - */ - public void stopReplicationService() { - join(); - } - /** * Join with the replication threads */ @@ -144,7 +115,7 @@ public class Replication implements WALActionsListener, * it starts * @throws IOException */ - public void startReplicationService() throws IOException { + public void startReplicationServices() throws IOException { if (this.replication) { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server);