HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).

Change-Id: Ie0e24cc617ab4bf56de8b1747062d1b78a5d4669
This commit is contained in:
Apekshit Sharma 2017-08-17 20:59:35 -07:00
parent 205016ca79
commit e2ce252b59
27 changed files with 301 additions and 113 deletions

View File

@ -26,37 +26,22 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@ -101,16 +86,6 @@ public class ReplicationAdmin implements Closeable {
Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL); Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
private final Connection connection; private final Connection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
// be moved to hbase-server. Resolve it in HBASE-11392.
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
/**
* A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
* on {@link #close()}.
*/
private final ZooKeeperWatcher zkw;
private Admin admin; private Admin admin;
/** /**
@ -122,49 +97,6 @@ public class ReplicationAdmin implements Closeable {
public ReplicationAdmin(Configuration conf) throws IOException { public ReplicationAdmin(Configuration conf) throws IOException {
this.connection = ConnectionFactory.createConnection(conf); this.connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin(); admin = connection.getAdmin();
try {
zkw = createZooKeeperWatcher();
try {
this.replicationQueuesClient =
ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
this.connection, zkw));
this.replicationQueuesClient.init();
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
this.replicationQueuesClient, this.connection);
this.replicationPeers.init();
} catch (Exception exception) {
if (zkw != null) {
zkw.close();
}
throw exception;
}
} catch (Exception exception) {
connection.close();
if (exception instanceof IOException) {
throw (IOException) exception;
} else if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else {
throw new IOException("Error initializing the replication admin client.", exception);
}
}
}
private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
// This Abortable doesn't 'abort'... it just logs.
return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error(why, e);
// We used to call system.exit here but this script can be embedded by other programs that
// want to do replication stuff... so inappropriate calling System.exit. Just log for now.
}
@Override
public boolean isAborted() {
return false;
}
});
} }
/** /**
@ -452,9 +384,6 @@ public class ReplicationAdmin implements Closeable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (this.zkw != null) {
this.zkw.close();
}
if (this.connection != null) { if (this.connection != null) {
this.connection.close(); this.connection.close();
} }
@ -518,40 +447,13 @@ public class ReplicationAdmin implements Closeable {
admin.disableTableReplication(tableName); admin.disableTableReplication(tableName);
} }
@VisibleForTesting
@Deprecated
public void peerAdded(String id) throws ReplicationException {
this.replicationPeers.peerConnected(id);
}
/** /**
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
*/ */
@VisibleForTesting @VisibleForTesting
@Deprecated @Deprecated
List<ReplicationPeer> listReplicationPeers() throws IOException { List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); return admin.listReplicationPeers();
if (peers == null || peers.size() <= 0) {
return null;
}
List<ReplicationPeer> listOfPeers = new ArrayList<>(peers.size());
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
String peerId = peerEntry.getKey();
try {
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
Configuration peerConf = pair.getSecond();
ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
peerId, pair.getFirst(), this.connection);
listOfPeers.add(peer);
} catch (ReplicationException e) {
LOG.warn("Failed to get valid replication peers. "
+ "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
+ e.getMessage());
LOG.debug("Failure details to get valid replication peers.", e);
continue;
}
}
return listOfPeers;
} }
/** /**

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
@ -1772,23 +1771,18 @@ public class ZKUtil {
*/ */
private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb) private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb)
throws KeeperException { throws KeeperException {
String replicationZNodeName = zkw.getConfiguration().get("zookeeper.znode.replication", String replicationZnode = zkw.znodePaths.replicationZNode;
"replication");
String replicationZnode = joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return; if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return;
// do a ls -r on this znode // do a ls -r on this znode
sb.append("\n").append(replicationZnode).append(": "); sb.append("\n").append(replicationZnode).append(": ");
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode); List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
for (String child : children) { for (String child : children) {
String znode = joinZNode(replicationZnode, child); String znode = joinZNode(replicationZnode, child);
if (child.equals(zkw.getConfiguration().get("zookeeper.znode.replication.peers", "peers"))) { if (znode.equals(zkw.znodePaths.peersZNode)) {
appendPeersZnodes(zkw, znode, sb); appendPeersZnodes(zkw, znode, sb);
} else if (child.equals(zkw.getConfiguration(). } else if (znode.equals(zkw.znodePaths.queuesZNode)) {
get("zookeeper.znode.replication.rs", "rs"))) {
appendRSZnodes(zkw, znode, sb); appendRSZnodes(zkw, znode, sb);
} else if (child.equals(zkw.getConfiguration().get( } else if (znode.equals(zkw.znodePaths.hfileRefsZNode)) {
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
appendHFileRefsZnodes(zkw, znode, sb); appendHFileRefsZnodes(zkw, znode, sb);
} }
} }

View File

@ -79,6 +79,15 @@ public class ZNodePaths {
// znode of indicating master maintenance mode // znode of indicating master maintenance mode
public final String masterMaintZNode; public final String masterMaintZNode;
// znode containing all replication state.
public final String replicationZNode;
// znode containing a list of all remote slave (i.e. peer) clusters.
public final String peersZNode;
// znode containing all replication queues
public final String queuesZNode;
// znode containing queues of hfile references to be replicated
public final String hfileRefsZNode;
public ZNodePaths(Configuration conf) { public ZNodePaths(Configuration conf) {
baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder(); ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder();
@ -113,6 +122,15 @@ public class ZNodePaths {
conf.get("zookeeper.znode.namespace", "namespace")); conf.get("zookeeper.znode.namespace", "namespace"));
masterMaintZNode = ZKUtil.joinZNode(baseZNode, masterMaintZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.masterMaintenance", "master-maintenance")); conf.get("zookeeper.znode.masterMaintenance", "master-maintenance"));
replicationZNode =
ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.replication", "replication"));
peersZNode =
ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.peers", "peers"));
queuesZNode =
ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
hfileRefsZNode =
ZKUtil.joinZNode(replicationZNode,
conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs"));
} }
@Override @Override
@ -125,7 +143,9 @@ public class ZNodePaths {
+ ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode + ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode
+ ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode + ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode
+ ", recoveringRegionsZNode=" + recoveringRegionsZNode + ", namespaceZNode=" + ", recoveringRegionsZNode=" + recoveringRegionsZNode + ", namespaceZNode="
+ namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + "]"; + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + ", replicationZNode="
+ replicationZNode + ", peersZNode=" + peersZNode + ", queuesZNode=" + queuesZNode
+ ", hfileRefsZNode=" + hfileRefsZNode + "]";
} }
/** /**

264
hbase-replication/pom.xml Normal file
View File

@ -0,0 +1,264 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>2.0.0-alpha3-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-replication</artifactId>
<name>Apache HBase - Replication</name>
<description>HBase Replication Support</description>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<!--Make it so assembly:single does nothing in here-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<skipAssembly>true</skipAssembly>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<!-- Always skip the second part executions, since we only run
simple unit tests in this module -->
<executions>
<execution>
<id>secondPartTestsExecution</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
<!-- Make a jar and put the sources in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.2,)</versionRange>
<goals>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<!-- Intra-project dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
</dependency>
<!-- General dependencies -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- profile against Hadoop 2.x: This is the default. -->
<profile>
<id>hadoop-2.0</id>
<activation>
<property>
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
<!--h2--><name>!hadoop.profile</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
mvn -Dhadoop.profile=3.0
-->
<profile>
<id>hadoop-3.0</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3.0</value>
</property>
</activation>
<properties>
<hadoop.version>3.0-SNAPSHOT</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -376,6 +376,10 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId> <artifactId>hbase-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-prefix-tree</artifactId> <artifactId>hbase-prefix-tree</artifactId>

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
/** /**
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
* class rather than implementing {@link ReplicationEndpoint} directly for better backwards * class rather than implementing {@link ReplicationEndpoint} directly for better backwards

View File

@ -61,6 +61,7 @@
</licenses> </licenses>
<modules> <modules>
<module>hbase-replication</module>
<module>hbase-resource-bundle</module> <module>hbase-resource-bundle</module>
<module>hbase-server</module> <module>hbase-server</module>
<module>hbase-thrift</module> <module>hbase-thrift</module>
@ -1563,6 +1564,11 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<artifactId>hbase-replication</artifactId>
<groupId>org.apache.hbase</groupId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<artifactId>hbase-server</artifactId> <artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>