HBASE-15393 Enable table replication command will fail when parent znode is not default in peer cluster (Ashish Singhi)
This commit is contained in:
parent
bbb10c4c18
commit
c1524485c5
|
@ -18,6 +18,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client.replication;
|
package org.apache.hadoop.hbase.client.replication;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -32,8 +35,6 @@ import java.util.Set;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -44,7 +45,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
|
@ -57,11 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -596,7 +591,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
*/
|
*/
|
||||||
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
|
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<ReplicationPeer> repPeers = listValidReplicationPeers();
|
List<ReplicationPeer> repPeers = listReplicationPeers();
|
||||||
if (repPeers == null || repPeers.size() <= 0) {
|
if (repPeers == null || repPeers.size() <= 0) {
|
||||||
throw new IllegalArgumentException("Found no peer cluster for replication.");
|
throw new IllegalArgumentException("Found no peer cluster for replication.");
|
||||||
}
|
}
|
||||||
|
@ -637,45 +632,29 @@ public class ReplicationAdmin implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
List<ReplicationPeer> listValidReplicationPeers() {
|
List<ReplicationPeer> listReplicationPeers() {
|
||||||
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
|
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
|
||||||
if (peers == null || peers.size() <= 0) {
|
if (peers == null || peers.size() <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
|
List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
|
||||||
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
|
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
|
||||||
String peerId = peerEntry.getKey();
|
String peerId = peerEntry.getKey();
|
||||||
Stat s = null;
|
|
||||||
try {
|
try {
|
||||||
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
|
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
|
||||||
Configuration peerConf = pair.getSecond();
|
Configuration peerConf = pair.getSecond();
|
||||||
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
|
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
|
||||||
parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
|
parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
|
||||||
s =
|
listOfPeers.add(peer);
|
||||||
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
|
|
||||||
null);
|
|
||||||
if (null == s) {
|
|
||||||
LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
validPeers.add(peer);
|
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn("Failed to get valid replication peers. "
|
LOG.warn("Failed to get valid replication peers. "
|
||||||
+ "Error connecting to peer cluster with peerId=" + peerId);
|
+ "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
|
||||||
LOG.debug("Failure details to get valid replication peers.", e);
|
+ e.getMessage());
|
||||||
continue;
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
LOG.warn("Failed to get valid replication peers. KeeperException code="
|
|
||||||
+ e.code().intValue());
|
|
||||||
LOG.debug("Failure details to get valid replication peers.", e);
|
|
||||||
continue;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Failed to get valid replication peers due to InterruptedException.");
|
|
||||||
LOG.debug("Failure details to get valid replication peers.", e);
|
LOG.debug("Failure details to get valid replication peers.", e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return validPeers;
|
return listOfPeers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -166,7 +166,7 @@ public class TestReplicationAdmin {
|
||||||
config.getConfiguration().put("key2", "value2");
|
config.getConfiguration().put("key2", "value2");
|
||||||
admin.addPeer(ID_ONE, config, null);
|
admin.addPeer(ID_ONE, config, null);
|
||||||
|
|
||||||
List<ReplicationPeer> peers = admin.listValidReplicationPeers();
|
List<ReplicationPeer> peers = admin.listReplicationPeers();
|
||||||
assertEquals(1, peers.size());
|
assertEquals(1, peers.size());
|
||||||
ReplicationPeer peerOne = peers.get(0);
|
ReplicationPeer peerOne = peers.get(0);
|
||||||
assertNotNull(peerOne);
|
assertNotNull(peerOne);
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.client.replication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestReplicationAdminWithTwoDifferentZKClusters {
|
||||||
|
|
||||||
|
private static Configuration conf1 = HBaseConfiguration.create();
|
||||||
|
private static Configuration conf2;
|
||||||
|
|
||||||
|
private static HBaseTestingUtility utility1;
|
||||||
|
private static HBaseTestingUtility utility2;
|
||||||
|
private static ReplicationAdmin admin;
|
||||||
|
|
||||||
|
private static final TableName tableName = TableName.valueOf("test");
|
||||||
|
private static final byte[] famName = Bytes.toBytes("f");
|
||||||
|
private static final String peerId = "peer1";
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||||
|
utility1 = new HBaseTestingUtility(conf1);
|
||||||
|
utility1.startMiniCluster();
|
||||||
|
admin = new ReplicationAdmin(conf1);
|
||||||
|
|
||||||
|
conf2 = HBaseConfiguration.create(conf1);
|
||||||
|
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||||
|
conf2.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2182);
|
||||||
|
|
||||||
|
utility2 = new HBaseTestingUtility(conf2);
|
||||||
|
utility2.startMiniCluster();
|
||||||
|
|
||||||
|
ReplicationPeerConfig config = new ReplicationPeerConfig();
|
||||||
|
config.setClusterKey(utility2.getClusterKey());
|
||||||
|
admin.addPeer(peerId, config, null);
|
||||||
|
|
||||||
|
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||||
|
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
|
table.addFamily(fam);
|
||||||
|
|
||||||
|
utility1.getHBaseAdmin().createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
utility1.waitUntilAllRegionsAssigned(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
admin.removePeer(peerId);
|
||||||
|
admin.close();
|
||||||
|
utility1.deleteTable(tableName);
|
||||||
|
utility2.deleteTable(tableName);
|
||||||
|
utility2.shutdownMiniCluster();
|
||||||
|
utility1.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test for HBASE-15393
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEnableTableReplication() throws Exception {
|
||||||
|
admin.enableTableRep(tableName);
|
||||||
|
assertTrue(utility2.getHBaseAdmin().tableExists(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableTableReplication() throws Exception {
|
||||||
|
admin.disableTableRep(tableName);
|
||||||
|
assertTrue(utility2.getHBaseAdmin().tableExists(tableName));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue