diff --git a/CHANGES.txt b/CHANGES.txt index ce41ed233ad..1aa56c41fa8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,8 @@ Release 0.93.0 - Unreleased IMPROVEMENT HBASE-4132 Extend the WALActionsListener API to accomodate log archival (dhruba borthakur) + HBASE-4131 Make the Replication Service pluggable via a standard + interface definition (dhruba borthakur) Release 0.92.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7ea64b7e778..3af1bf03038 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -473,8 +473,17 @@ public final class HConstants { */ public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000; + /* + * cluster replication constants. + */ public static final String REPLICATION_ENABLE_KEY = "hbase.replication"; + public static final String + REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; + public static final String + REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; + public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = + "org.apache.hadoop.hbase.replication.regionserver.Replication"; /** HBCK special code name used as server name when manipulating ZK nodes */ public static final String HBCK_CODE_NAME = "HBCKServerName"; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5869c18412d..9f1fd8fe2a6 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -284,7 +285,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private ExecutorService service; // Replication services. If no replication, this handler will be null. - private Replication replicationHandler; + private ReplicationSourceService replicationSourceHandler; + private ReplicationSinkService replicationSinkHandler; private final RegionServerAccounting regionServerAccounting; @@ -1169,12 +1171,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Instantiate replication manager if replication enabled. Pass it the // log directories. - try { - this.replicationHandler = Replication.isReplication(this.conf)? - new Replication(this, this.fs, logdir, oldLogDir): null; - } catch (KeeperException e) { - throw new IOException("Failed replication handler create", e); - } + createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); return instantiateHLog(logdir, oldLogDir); } @@ -1201,9 +1198,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Log roller. this.hlogRoller = new LogRoller(this, this); listeners.add(this.hlogRoller); - if (this.replicationHandler != null) { + if (this.replicationSourceHandler != null && + this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. - listeners.add(this.replicationHandler); + listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return listeners; } @@ -1351,8 +1349,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // that port is occupied. Adjust serverInfo if this is the case. this.webuiport = putUpWebUI(); - if (this.replicationHandler != null) { - this.replicationHandler.startReplicationServices(); + if (this.replicationSourceHandler == this.replicationSinkHandler && + this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } else if (this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } else if (this.replicationSinkHandler != null) { + this.replicationSinkHandler.startReplicationService(); } // Start Server. This service is like leases in that it internally runs @@ -1557,11 +1560,32 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.compactSplitThread.join(); } if (this.service != null) this.service.shutdown(); - if (this.replicationHandler != null) { - this.replicationHandler.join(); + if (this.replicationSourceHandler != null && + this.replicationSourceHandler == this.replicationSinkHandler) { + this.replicationSourceHandler.stopReplicationService(); + } else if (this.replicationSourceHandler != null) { + this.replicationSourceHandler.stopReplicationService(); + } else if (this.replicationSinkHandler != null) { + this.replicationSinkHandler.stopReplicationService(); } } + /** + * @return Return the object that implements the replication + * source service. + */ + ReplicationSourceService getReplicationSourceService() { + return replicationSourceHandler; + } + + /** + * @return Return the object that implements the replication + * sink service. + */ + ReplicationSinkService getReplicationSinkService() { + return replicationSinkHandler; + } + /** * Get the current master from ZooKeeper and open the RPC connection to it. * @@ -3063,6 +3087,63 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Main program and support routines // + /** + * Load the replication service objects, if any + */ + static private void createNewReplicationInstance(Configuration conf, + HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{ + + // If replication is not enabled, then return immediately. + if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { + return; + } + + // read in the name of the source replication class from the config file. + String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, + HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + + // read in the name of the sink replication class from the config file. + String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, + HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + + // If both the sink and the source class names are the same, then instantiate + // only one object. + if (sourceClassname.equals(sinkClassname)) { + server.replicationSourceHandler = (ReplicationSourceService) + newReplicationInstance(sourceClassname, + conf, server, fs, logDir, oldLogDir); + server.replicationSinkHandler = (ReplicationSinkService) + server.replicationSinkHandler; + } + else { + server.replicationSourceHandler = (ReplicationSourceService) + newReplicationInstance(sourceClassname, + conf, server, fs, logDir, oldLogDir); + server.replicationSinkHandler = (ReplicationSinkService) + newReplicationInstance(sinkClassname, + conf, server, fs, logDir, oldLogDir); + } + } + + static private ReplicationService newReplicationInstance(String classname, + Configuration conf, HRegionServer server, FileSystem fs, Path logDir, + Path oldLogDir) throws IOException{ + + Class clazz = null; + try { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + clazz = Class.forName(classname, true, classLoader); + } catch (java.lang.ClassNotFoundException nfe) { + throw new IOException("Cound not find class for " + classname); + } + + // create an instance of the replication object. + ReplicationService service = (ReplicationService) + ReflectionUtils.newInstance(clazz, conf); + service.initialize(server, fs, logDir, oldLogDir); + return service; + } + /** * @param hrs * @return Thread the RegionServer is running in correctly named. @@ -3115,8 +3196,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void replicateLogEntries(final HLog.Entry[] entries) throws IOException { checkOpen(); - if (this.replicationHandler == null) return; - this.replicationHandler.replicateLogEntries(entries); + if (this.replicationSinkHandler == null) return; + this.replicationSinkHandler.replicateLogEntries(entries); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java new file mode 100644 index 00000000000..489e1410dfb --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Gateway to Cluster Replication. + * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. + * One such application is a cross-datacenter + * replication service that can keep two hbase clusters in sync. + */ +public interface ReplicationService { + + /** + * Initializes the replication service object. + * @throws IOException + */ + public void initialize(Server rs, FileSystem fs, Path logdir, + Path oldLogDir) throws IOException; + + /** + * Start replication services. + * @throws IOException + */ + public void startReplicationService() throws IOException; + + /** + * Stops replication service. + */ + public void stopReplicationService(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java new file mode 100644 index 00000000000..a5706731e80 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +/** + * A sink for a replication stream has to expose this service. + * This service allows an application to hook into the + * regionserver and behave as a replication sink. + */ +public interface ReplicationSinkService extends ReplicationService { + + /** + * Carry on the list of log entries down to the sink + * @param entries list of entries to replicate + * @throws IOException + */ + public void replicateLogEntries(HLog.Entry[] entries) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java new file mode 100644 index 00000000000..af0a7d887e2 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; + +/** + * A source for a replication stream has to expose this service. + * This service allows an application to hook into the + * regionserver and watch for new transactions. + */ +public interface ReplicationSourceService extends ReplicationService { + + /** + * Returns a WALObserver for the service. This is needed to + * observe log rolls and log archival events. + */ + public WALActionsListener getWALActionsListener(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 0ce77640066..9d6b9ccefb7 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; +import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -47,15 +49,16 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ -public class Replication implements WALActionsListener { - private final boolean replication; - private final ReplicationSourceManager replicationManager; +public class Replication implements WALActionsListener, + ReplicationSourceService, ReplicationSinkService { + private boolean replication; + private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); - private final ReplicationZookeeper zkHelper; - private final Configuration conf; + private ReplicationZookeeper zkHelper; + private Configuration conf; private ReplicationSink replicationSink; // Hosting server - private final Server server; + private Server server; /** * Instantiate the replication management (if rep is enabled). @@ -64,16 +67,29 @@ public class Replication implements WALActionsListener { * @param logDir * @param oldLogDir directory where logs are archived * @throws IOException - * @throws KeeperException */ public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) - throws IOException, KeeperException { + final Path logDir, final Path oldLogDir) throws IOException{ + initialize(server, fs, logDir, oldLogDir); + } + + /** + * Empty constructor + */ + public Replication() { + } + + public void initialize(final Server server, final FileSystem fs, + final Path logDir, final Path oldLogDir) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); if (replication) { - this.zkHelper = new ReplicationZookeeper(server, this.replicating); + try { + this.zkHelper = new ReplicationZookeeper(server, this.replicating); + } catch (KeeperException ke) { + throw new IOException("Failed replication handler create", ke); + } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; } else { @@ -82,14 +98,27 @@ public class Replication implements WALActionsListener { } } - /** - * @param c Configuration to look at - * @return True if replication is enabled. - */ + /** + * @param c Configuration to look at + * @return True if replication is enabled. + */ public static boolean isReplication(final Configuration c) { return c.getBoolean(REPLICATION_ENABLE_KEY, false); } + /* + * Returns an object to listen to new hlog changes + **/ + public WALActionsListener getWALActionsListener() { + return this; + } + /** + * Stops replication service. + */ + public void stopReplicationService() { + join(); + } + /** * Join with the replication threads */ @@ -115,7 +144,7 @@ public class Replication implements WALActionsListener { * it starts * @throws IOException */ - public void startReplicationServices() throws IOException { + public void startReplicationService() throws IOException { if (this.replication) { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server);