HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2162)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Sandeep Pal 2020-07-29 09:09:07 -07:00 committed by GitHub
parent 5f27a009ab
commit d65fb87a2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 235 additions and 208 deletions

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException; import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
@ -41,6 +38,9 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@ -155,11 +155,10 @@ implements Configurable {
* @param context The current task context. * @param context The current task context.
* @return The newly created writer instance. * @return The newly created writer instance.
* @throws IOException When creating the writer fails. * @throws IOException When creating the writer fails.
* @throws InterruptedException When the jobs is cancelled.
*/ */
@Override @Override
public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException { throws IOException {
return new TableRecordWriter(); return new TableRecordWriter();
} }
@ -168,18 +167,18 @@ implements Configurable {
* *
* @param context The current context. * @param context The current context.
* @throws IOException When the check fails. * @throws IOException When the check fails.
* @throws InterruptedException When the job is aborted.
* @see OutputFormat#checkOutputSpecs(JobContext) * @see OutputFormat#checkOutputSpecs(JobContext)
*/ */
@Override @Override
public void checkOutputSpecs(JobContext context) throws IOException, public void checkOutputSpecs(JobContext context)
InterruptedException { throws IOException {
Configuration hConf = getConf(); Configuration hConf = getConf();
if (hConf == null) { if (hConf == null) {
hConf = context.getConfiguration(); hConf = context.getConfiguration();
} }
try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { try (Connection connection = ConnectionFactory.createConnection(hConf);
Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
if (!admin.tableExists(tableName)) { if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Can't write, table does not exist:" + throw new TableNotFoundException("Can't write, table does not exist:" +

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -161,25 +162,26 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
if (!isRegionReplicaReplicationEnabled(conf)) { if (!isRegionReplicaReplicationEnabled(conf)) {
return; return;
} }
Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
ReplicationPeerConfig peerConfig = null; try (Connection connection = ConnectionFactory.createConnection(conf);
try { Admin admin = connection.getAdmin()) {
peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER); ReplicationPeerConfig peerConfig = null;
} catch (ReplicationPeerNotFoundException e) { try {
LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
+ " not exist", e); } catch (ReplicationPeerNotFoundException e) {
} LOG.warn(
try { "Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist",
e);
}
if (peerConfig == null) { if (peerConfig == null) {
LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
+ " not exist. Creating..."); + " not exist. Creating...");
peerConfig = new ReplicationPeerConfig(); peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig); admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
} }
} finally {
admin.close();
} }
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
@ -64,6 +66,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
private static HBaseTestingUtility TEST_UTIL2; private static HBaseTestingUtility TEST_UTIL2;
private static Configuration conf2; private static Configuration conf2;
private static AsyncAdmin admin2; private static AsyncAdmin admin2;
private static AsyncConnection connection;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -78,14 +81,21 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
TEST_UTIL2 = new HBaseTestingUtility(conf2); TEST_UTIL2 = new HBaseTestingUtility(conf2);
TEST_UTIL2.startMiniCluster(); TEST_UTIL2.startMiniCluster();
admin2 =
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); connection =
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
admin2 = connection.getAdmin();
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL2.getClusterKey()); rpc.setClusterKey(TEST_UTIL2.getClusterKey());
ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
} }
@AfterClass
public static void clearUp() throws IOException {
connection.close();
}
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {

View File

@ -396,12 +396,12 @@ public class TestReplicaWithCluster {
LOG.info("Setup second Zk"); LOG.info("Setup second Zk");
HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(HTU2.getClusterKey()); rpc.setClusterKey(HTU2.getClusterKey());
admin.addReplicationPeer("2", rpc); admin.addReplicationPeer("2", rpc);
admin.close(); }
Put p = new Put(row); Put p = new Put(row);
p.addColumn(row, row, row); p.addColumn(row, row, row);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin; import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -149,27 +150,29 @@ public class TestClearRegionBlockCache {
@Test @Test
public void testClearBlockCacheFromAsyncAdmin() throws Exception { public void testClearBlockCacheFromAsyncAdmin() throws Exception {
AsyncAdmin admin = try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(HTU.getConfiguration())
ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin(); .get()) {
AsyncAdmin admin = conn.getAdmin();
BlockCache blockCache1 = rs1.getBlockCache().get(); BlockCache blockCache1 = rs1.getBlockCache().get();
BlockCache blockCache2 = rs2.getBlockCache().get(); BlockCache blockCache2 = rs2.getBlockCache().get();
long initialBlockCount1 = blockCache1.getBlockCount(); long initialBlockCount1 = blockCache1.getBlockCount();
long initialBlockCount2 = blockCache2.getBlockCount(); long initialBlockCount2 = blockCache2.getBlockCount();
// scan will cause blocks to be added in BlockCache // scan will cause blocks to be added in BlockCache
scanAllRegionsForRS(rs1); scanAllRegionsForRS(rs1);
assertEquals(blockCache1.getBlockCount() - initialBlockCount1, assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)); HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
scanAllRegionsForRS(rs2); scanAllRegionsForRS(rs2);
assertEquals(blockCache2.getBlockCount() - initialBlockCount2, assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY)); HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get(); CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY)); .getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
assertEquals(initialBlockCount1, blockCache1.getBlockCount()); assertEquals(initialBlockCount1, blockCache1.getBlockCount());
assertEquals(initialBlockCount2, blockCache2.getBlockCount()); assertEquals(initialBlockCount2, blockCache2.getBlockCount());
}
} }
private void scanAllRegionsForRS(HRegionServer rs) throws IOException { private void scanAllRegionsForRS(HRegionServer rs) throws IOException {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
@ -486,8 +487,8 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber, private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception { int slaveClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
.getAdmin()) { Admin admin = conn.getAdmin()) {
admin.addReplicationPeer(id, admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())); new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
} }
@ -495,8 +496,8 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception { throws Exception {
try (Admin admin = try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) { Admin admin = conn.getAdmin()) {
admin.addReplicationPeer( admin.addReplicationPeer(
id, id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
@ -506,15 +507,15 @@ public class TestMasterReplication {
} }
private void disablePeer(String id, int masterClusterNumber) throws Exception { private void disablePeer(String id, int masterClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
.getAdmin()) { Admin admin = conn.getAdmin()) {
admin.disableReplicationPeer(id); admin.disableReplicationPeer(id);
} }
} }
private void enablePeer(String id, int masterClusterNumber) throws Exception { private void enablePeer(String id, int masterClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
.getAdmin()) { Admin admin = conn.getAdmin()) {
admin.enableReplicationPeer(id); admin.enableReplicationPeer(id);
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -138,73 +139,74 @@ public class TestMultiSlaveReplication {
MiniHBaseCluster master = utility1.startMiniCluster(); MiniHBaseCluster master = utility1.startMiniCluster();
utility2.startMiniCluster(); utility2.startMiniCluster();
utility3.startMiniCluster(); utility3.startMiniCluster();
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); try (Connection conn = ConnectionFactory.createConnection(conf1);
Admin admin1 = conn.getAdmin()) {
utility1.getAdmin().createTable(table);
utility2.getAdmin().createTable(table);
utility3.getAdmin().createTable(table);
Table htable1 = utility1.getConnection().getTable(tableName);
Table htable2 = utility2.getConnection().getTable(tableName);
Table htable3 = utility3.getConnection().getTable(tableName);
utility1.getAdmin().createTable(table); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
utility2.getAdmin().createTable(table); rpc.setClusterKey(utility2.getClusterKey());
utility3.getAdmin().createTable(table); admin1.addReplicationPeer("1", rpc);
Table htable1 = utility1.getConnection().getTable(tableName);
Table htable2 = utility2.getConnection().getTable(tableName);
Table htable3 = utility3.getConnection().getTable(tableName);
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); // put "row" and wait 'til it got around, then delete
rpc.setClusterKey(utility2.getClusterKey()); putAndWait(row, famName, htable1, htable2);
admin1.addReplicationPeer("1", rpc); deleteAndWait(row, htable1, htable2);
// check it wasn't replication to cluster 3
checkRow(row, 0, htable3);
// put "row" and wait 'til it got around, then delete putAndWait(row2, famName, htable1, htable2);
putAndWait(row, famName, htable1, htable2);
deleteAndWait(row, htable1, htable2);
// check it wasn't replication to cluster 3
checkRow(row,0,htable3);
putAndWait(row2, famName, htable1, htable2); // now roll the region server's logs
rollWALAndWait(utility1, htable1.getName(), row2);
// now roll the region server's logs // after the log was rolled put a new row
rollWALAndWait(utility1, htable1.getName(), row2); putAndWait(row3, famName, htable1, htable2);
// after the log was rolled put a new row rpc = new ReplicationPeerConfig();
putAndWait(row3, famName, htable1, htable2); rpc.setClusterKey(utility3.getClusterKey());
admin1.addReplicationPeer("2", rpc);
rpc = new ReplicationPeerConfig(); // put a row, check it was replicated to all clusters
rpc.setClusterKey(utility3.getClusterKey()); putAndWait(row1, famName, htable1, htable2, htable3);
admin1.addReplicationPeer("2", rpc); // delete and verify
deleteAndWait(row1, htable1, htable2, htable3);
// put a row, check it was replicated to all clusters // make sure row2 did not get replicated after
putAndWait(row1, famName, htable1, htable2, htable3); // cluster 3 was added
// delete and verify checkRow(row2, 0, htable3);
deleteAndWait(row1, htable1, htable2, htable3);
// make sure row2 did not get replicated after // row3 will get replicated, because it was in the
// cluster 3 was added // latest log
checkRow(row2,0,htable3); checkRow(row3, 1, htable3);
// row3 will get replicated, because it was in the Put p = new Put(row);
// latest log p.addColumn(famName, row, row);
checkRow(row3,1,htable3); htable1.put(p);
// now roll the logs again
rollWALAndWait(utility1, htable1.getName(), row);
Put p = new Put(row); // cleanup "row2", also conveniently use this to wait replication
p.addColumn(famName, row, row); // to finish
htable1.put(p); deleteAndWait(row2, htable1, htable2, htable3);
// now roll the logs again // Even if the log was rolled in the middle of the replication
rollWALAndWait(utility1, htable1.getName(), row); // "row" is still replication.
checkRow(row, 1, htable2);
// Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
// we should wait before checking.
checkWithWait(row, 1, htable3);
// cleanup "row2", also conveniently use this to wait replication // cleanup the rest
// to finish deleteAndWait(row, htable1, htable2, htable3);
deleteAndWait(row2, htable1, htable2, htable3); deleteAndWait(row3, htable1, htable2, htable3);
// Even if the log was rolled in the middle of the replication
// "row" is still replication.
checkRow(row, 1, htable2);
// Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
// we should wait before checking.
checkWithWait(row, 1, htable3);
// cleanup the rest utility3.shutdownMiniCluster();
deleteAndWait(row, htable1, htable2, htable3); utility2.shutdownMiniCluster();
deleteAndWait(row3, htable1, htable2, htable3); utility1.shutdownMiniCluster();
}
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
} }
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,

View File

@ -17,8 +17,13 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -56,7 +61,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@Category({FlakeyTests.class, LargeTests.class}) @Category({FlakeyTests.class, LargeTests.class})
@ -375,14 +379,13 @@ public class TestPerTableCFReplication {
@Test @Test
public void testPerTableCFReplication() throws Exception { public void testPerTableCFReplication() throws Exception {
LOG.info("testPerTableCFReplication"); LOG.info("testPerTableCFReplication");
Admin replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); try (Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection2 = ConnectionFactory.createConnection(conf2); Connection connection3 = ConnectionFactory.createConnection(conf3);
Connection connection3 = ConnectionFactory.createConnection(conf3);
try {
Admin admin1 = connection1.getAdmin(); Admin admin1 = connection1.getAdmin();
Admin admin2 = connection2.getAdmin(); Admin admin2 = connection2.getAdmin();
Admin admin3 = connection3.getAdmin(); Admin admin3 = connection3.getAdmin();
Admin replicationAdmin = connection1.getAdmin()) {
admin1.createTable(tabA); admin1.createTable(tabA);
admin1.createTable(tabB); admin1.createTable(tabB);
@ -524,10 +527,6 @@ public class TestPerTableCFReplication {
// cf 'f3' of tableC can replicated to cluster2 and cluster3 // cf 'f3' of tableC can replicated to cluster2 and cluster3
putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
} finally {
connection1.close();
connection2.close();
connection3.close();
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -54,7 +53,6 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@ -66,7 +64,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
*/ */
public class TestReplicationBase { public class TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
private static Connection connection1;
private static Connection connection2;
protected static Configuration CONF_WITH_LOCALFS; protected static Configuration CONF_WITH_LOCALFS;
protected static Admin hbaseAdmin; protected static Admin hbaseAdmin;
@ -244,25 +243,26 @@ public class TestReplicationBase {
// as a component in deciding maximum number of parallel batches to send to the peer cluster. // as a component in deciding maximum number of parallel batches to send to the peer cluster.
UTIL2.startMiniCluster(NUM_SLAVES2); UTIL2.startMiniCluster(NUM_SLAVES2);
hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); connection1 = ConnectionFactory.createConnection(CONF1);
connection2 = ConnectionFactory.createConnection(CONF2);
hbaseAdmin = connection1.getAdmin();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
Connection connection1 = ConnectionFactory.createConnection(CONF1); try (
Connection connection2 = ConnectionFactory.createConnection(CONF2); Admin admin1 = connection1.getAdmin();
try (Admin admin1 = connection1.getAdmin()) { Admin admin2 = connection2.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
UTIL1.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
htable2 = connection2.getTable(tableName);
} }
UTIL1.waitUntilAllRegionsAssigned(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
htable2 = connection2.getTable(tableName);
} }
@BeforeClass @BeforeClass
@ -366,6 +366,13 @@ public class TestReplicationBase {
if (hbaseAdmin != null) { if (hbaseAdmin != null) {
hbaseAdmin.close(); hbaseAdmin.close();
} }
if (connection2 != null) {
connection2.close();
}
if (connection1 != null) {
connection1.close();
}
UTIL2.shutdownMiniCluster(); UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster(); UTIL1.shutdownMiniCluster();
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -105,11 +106,13 @@ public class TestGlobalReplicationThrottler {
utility1.startMiniCluster(); utility1.startMiniCluster();
utility2.startMiniCluster(); utility2.startMiniCluster();
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); try (Connection connection = ConnectionFactory.createConnection(utility1.getConfiguration());
admin1.addReplicationPeer("peer1", rpc); Admin admin1 = connection.getAdmin()) {
admin1.addReplicationPeer("peer2", rpc); admin1.addReplicationPeer("peer1", rpc);
admin1.addReplicationPeer("peer3", rpc); admin1.addReplicationPeer("peer2", rpc);
numOfPeer = admin1.listReplicationPeers().size(); admin1.addReplicationPeer("peer3", rpc);
numOfPeer = admin1.listReplicationPeers().size();
}
} }
@AfterClass @AfterClass

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -122,100 +121,102 @@ public class TestRegionReplicaReplicationEndpoint {
} }
@Test @Test
public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
// create a table with region replicas. Check whether the replication peer is created // create a table with region replicas. Check whether the replication peer is created
// and replication started. // and replication started.
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
String peerId = "region_replica_replication"; Admin admin = connection.getAdmin()) {
String peerId = "region_replica_replication";
ReplicationPeerConfig peerConfig = null; ReplicationPeerConfig peerConfig = null;
try { try {
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) { } catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
} }
if (peerConfig != null) { if (peerConfig != null) {
admin.removeReplicationPeer(peerId); admin.removeReplicationPeer(peerId);
peerConfig = null; peerConfig = null;
} }
HTableDescriptor htd = HTU.createTableDescriptor( HTableDescriptor htd = HTU
TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"), .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED); HColumnDescriptor.DEFAULT_KEEP_DELETED);
HTU.getAdmin().createTable(htd); HTU.getAdmin().createTable(htd);
try { try {
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist"); + " not exist");
} catch (ReplicationPeerNotFoundException e) { } catch (ReplicationPeerNotFoundException e) {
} }
assertNull(peerConfig); assertNull(peerConfig);
htd = HTU.createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"), htd = HTU.createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED); HColumnDescriptor.DEFAULT_KEEP_DELETED);
htd.setRegionReplication(2); htd.setRegionReplication(2);
HTU.getAdmin().createTable(htd); HTU.getAdmin().createTable(htd);
// assert peer configuration is correct // assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig); assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( assertEquals(peerConfig.getClusterKey(),
HTU.getConfiguration())); ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(), assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl()); peerConfig.getReplicationEndpointImpl());
admin.close(); }
} }
@Test @Test
public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
// modify a table by adding region replicas. Check whether the replication peer is created // modify a table by adding region replicas. Check whether the replication peer is created
// and replication started. // and replication started.
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
String peerId = "region_replica_replication"; Admin admin = connection.getAdmin()) {
String peerId = "region_replica_replication";
ReplicationPeerConfig peerConfig = null; ReplicationPeerConfig peerConfig = null;
try { try {
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) { } catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
} }
if (peerConfig != null) { if (peerConfig != null) {
admin.removeReplicationPeer(peerId); admin.removeReplicationPeer(peerId);
peerConfig = null; peerConfig = null;
} }
HTableDescriptor htd = HTU.createTableDescriptor( HTableDescriptor htd = HTU.createTableDescriptor(
TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"), TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED); HColumnDescriptor.DEFAULT_KEEP_DELETED);
HTU.getAdmin().createTable(htd); HTU.getAdmin().createTable(htd);
// assert that replication peer is not created yet // assert that replication peer is not created yet
try { try {
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist"); + " not exist");
} catch (ReplicationPeerNotFoundException e) { } catch (ReplicationPeerNotFoundException e) {
} }
assertNull(peerConfig); assertNull(peerConfig);
HTU.getAdmin().disableTable(htd.getTableName()); HTU.getAdmin().disableTable(htd.getTableName());
htd.setRegionReplication(2); htd.setRegionReplication(2);
HTU.getAdmin().modifyTable(htd); HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(htd.getTableName()); HTU.getAdmin().enableTable(htd.getTableName());
// assert peer configuration is correct // assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId); peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig); assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( assertEquals(peerConfig.getClusterKey(),
HTU.getConfiguration())); ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(), assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl()); peerConfig.getReplicationEndpointImpl());
admin.close(); }
} }
public void testRegionReplicaReplication(int regionReplication) throws Exception { public void testRegionReplicaReplication(int regionReplication) throws Exception {