HBASE-14794 Cleanup TestAtomicOperation
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java Fix a few missing table closes (This suite seems to leave loads of threads when test is done but have not figured the how yet). hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Fix some missing table closes. We were leaving around client resources. hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Close up WALs when done.
This commit is contained in:
parent
0207da8cee
commit
e82ccb900e
|
@ -170,12 +170,12 @@ public class TestMetaWithReplicas {
|
|||
util.getHBaseAdmin().disableTable(TABLE);
|
||||
util.getHBaseAdmin().deleteTable(TABLE);
|
||||
}
|
||||
Table htable = util.createTable(TABLE, FAMILIES);
|
||||
|
||||
ServerName master = null;
|
||||
try (Connection c = ConnectionFactory.createConnection(util.getConfiguration());) {
|
||||
try (Table htable = util.createTable(TABLE, FAMILIES);) {
|
||||
util.getHBaseAdmin().flush(TableName.META_TABLE_NAME);
|
||||
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
|
||||
30000) * 6);
|
||||
Connection c = ConnectionFactory.createConnection(util.getConfiguration());
|
||||
List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE);
|
||||
HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0));
|
||||
// Ensure that the primary server for test table is not the same one as the primary
|
||||
|
@ -194,7 +194,7 @@ public class TestMetaWithReplicas {
|
|||
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
|
||||
30000) * 3);
|
||||
}
|
||||
ServerName master = util.getHBaseClusterInterface().getClusterStatus().getMaster();
|
||||
master = util.getHBaseClusterInterface().getClusterStatus().getMaster();
|
||||
// kill the master so that regionserver recovery is not triggered at all
|
||||
// for the meta server
|
||||
util.getHBaseClusterInterface().stopMaster(master);
|
||||
|
@ -204,17 +204,19 @@ public class TestMetaWithReplicas {
|
|||
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
|
||||
}
|
||||
((ClusterConnection)c).clearRegionCache();
|
||||
htable.close();
|
||||
htable = c.getTable(TABLE);
|
||||
}
|
||||
Get get = null;
|
||||
Result r = null;
|
||||
byte[] row = "test".getBytes();
|
||||
try (Table htable = c.getTable(TABLE);) {
|
||||
Put put = new Put(row);
|
||||
put.addColumn("foo".getBytes(), row, row);
|
||||
BufferedMutator m = c.getBufferedMutator(TABLE);
|
||||
m.mutate(put);
|
||||
m.flush();
|
||||
// Try to do a get of the row that was just put
|
||||
Get get = new Get(row);
|
||||
Result r = htable.get(get);
|
||||
get = new Get(row);
|
||||
r = htable.get(get);
|
||||
assertTrue(Arrays.equals(r.getRow(), row));
|
||||
// now start back the killed servers and disable use of replicas. That would mean
|
||||
// calls go to the primary
|
||||
|
@ -222,12 +224,14 @@ public class TestMetaWithReplicas {
|
|||
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
|
||||
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
|
||||
((ClusterConnection)c).clearRegionCache();
|
||||
htable.close();
|
||||
}
|
||||
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
|
||||
htable = c.getTable(TABLE);
|
||||
try (Table htable = c.getTable(TABLE);) {
|
||||
r = htable.get(get);
|
||||
assertTrue(Arrays.equals(r.getRow(), row));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaLookupThreadPoolCreated() throws Exception {
|
||||
|
@ -237,7 +241,7 @@ public class TestMetaWithReplicas {
|
|||
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
|
||||
TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
|
||||
}
|
||||
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES);
|
||||
try (Table htable = TEST_UTIL.createTable(TABLE, FAMILIES);) {
|
||||
byte[] row = "test".getBytes();
|
||||
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
|
||||
// check that metalookup pool would get created
|
||||
|
@ -245,6 +249,7 @@ public class TestMetaWithReplicas {
|
|||
ExecutorService ex = c.getCurrentMetaLookupPool();
|
||||
assert(ex != null);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangingReplicaCount() throws Exception {
|
||||
|
@ -416,8 +421,8 @@ public class TestMetaWithReplicas {
|
|||
public void testShutdownOfReplicaHolder() throws Exception {
|
||||
// checks that the when the server holding meta replica is shut down, the meta replica
|
||||
// can be recovered
|
||||
ClusterConnection conn = (ClusterConnection)
|
||||
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
try (ClusterConnection conn = (ClusterConnection)
|
||||
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());) {
|
||||
RegionLocations rl = conn.
|
||||
locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
HRegionLocation hrl = rl.getRegionLocation(1);
|
||||
|
@ -432,12 +437,11 @@ public class TestMetaWithReplicas {
|
|||
i++;
|
||||
} while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3);
|
||||
assertTrue(i != 3);
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHBaseFsckWithExcessMetaReplicas() throws Exception {
|
||||
HBaseFsck hbck = new HBaseFsck(TEST_UTIL.getConfiguration());
|
||||
// Create a meta replica (this will be the 4th one) and assign it
|
||||
HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
|
||||
HRegionInfo.FIRST_META_REGIONINFO, 3);
|
||||
|
@ -447,7 +451,7 @@ public class TestMetaWithReplicas {
|
|||
TEST_UTIL.getMiniHBaseCluster().getMaster().assignRegion(h);
|
||||
HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getHBaseAdmin(), h);
|
||||
// check that problem exists
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED});
|
||||
// fix the problem
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
|
||||
|
|
|
@ -169,7 +169,7 @@ public class TestImportExport {
|
|||
@Test
|
||||
public void testSimpleCase() throws Exception {
|
||||
String EXPORT_TABLE = "exportSimpleCase";
|
||||
Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);
|
||||
try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) {
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
||||
|
@ -180,6 +180,7 @@ public class TestImportExport {
|
|||
p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
||||
p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
|
||||
t.put(p);
|
||||
}
|
||||
|
||||
String[] args = new String[] {
|
||||
EXPORT_TABLE,
|
||||
|
@ -189,7 +190,7 @@ public class TestImportExport {
|
|||
assertTrue(runExport(args));
|
||||
|
||||
String IMPORT_TABLE = "importTableSimpleCase";
|
||||
t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);
|
||||
try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
|
||||
args = new String[] {
|
||||
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
|
||||
IMPORT_TABLE,
|
||||
|
@ -206,6 +207,7 @@ public class TestImportExport {
|
|||
r = t.get(g);
|
||||
assertEquals(3, r.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test export hbase:meta table
|
||||
|
@ -238,13 +240,12 @@ public class TestImportExport {
|
|||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||
fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
|
||||
String IMPORT_TABLE = name;
|
||||
Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
|
||||
try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
|
||||
String[] args = new String[] {
|
||||
"-Dhbase.import.version=0.94" ,
|
||||
IMPORT_TABLE, FQ_OUTPUT_DIR
|
||||
};
|
||||
assertTrue(runImport(args));
|
||||
|
||||
/* exportedTableIn94Format contains 5 rows
|
||||
ROW COLUMN+CELL
|
||||
r1 column=f1:c1, timestamp=1383766761171, value=val1
|
||||
|
@ -254,7 +255,7 @@ public class TestImportExport {
|
|||
r5 column=f1:c1, timestamp=1383766791506, value=val5
|
||||
*/
|
||||
assertEquals(5, UTIL.countRows(t));
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -268,7 +269,7 @@ public class TestImportExport {
|
|||
.setMaxVersions(1)
|
||||
);
|
||||
UTIL.getHBaseAdmin().createTable(desc);
|
||||
Table t = UTIL.getConnection().getTable(desc.getTableName());
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
|
@ -287,7 +288,7 @@ public class TestImportExport {
|
|||
|
||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||
fs.delete(new Path(FQ_OUTPUT_DIR), true);
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -299,7 +300,7 @@ public class TestImportExport {
|
|||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
UTIL.getHBaseAdmin().createTable(desc);
|
||||
Table t = UTIL.getConnection().getTable(desc.getTableName());
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
|
@ -314,6 +315,7 @@ public class TestImportExport {
|
|||
d = new Delete(ROW1);
|
||||
d.addColumns(FAMILYA, QUAL, now+2);
|
||||
t.delete(d);
|
||||
}
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.RAW_SCAN + "=true",
|
||||
|
@ -330,8 +332,7 @@ public class TestImportExport {
|
|||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
UTIL.getHBaseAdmin().createTable(desc);
|
||||
t.close();
|
||||
t = UTIL.getConnection().getTable(desc.getTableName());
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
args = new String[] {
|
||||
IMPORT_TABLE,
|
||||
FQ_OUTPUT_DIR
|
||||
|
@ -351,7 +352,7 @@ public class TestImportExport {
|
|||
assertEquals(now+2, res[4].getTimestamp());
|
||||
assertEquals(now+1, res[5].getTimestamp());
|
||||
assertEquals(now, res[6].getTimestamp());
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -418,15 +419,11 @@ public class TestImportExport {
|
|||
|
||||
ResultScanner exportedTScanner = exportT.getScanner(s);
|
||||
Result exportedTResult = exportedTScanner.next();
|
||||
try
|
||||
{
|
||||
try {
|
||||
Result.compareResults(exportedTResult, importedTResult);
|
||||
}
|
||||
catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
fail("Original and imported tables data comparision failed with error:"+e.getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
} finally {
|
||||
exportT.close();
|
||||
importT.close();
|
||||
}
|
||||
|
@ -470,7 +467,8 @@ public class TestImportExport {
|
|||
|
||||
Table importTable = UTIL.getConnection().getTable(desc.getTableName());
|
||||
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
|
||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
|
||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000" };
|
||||
assertTrue(runImport(args));
|
||||
|
||||
|
@ -634,7 +632,7 @@ public class TestImportExport {
|
|||
public void testDurability() throws Exception {
|
||||
// Create an export table.
|
||||
String exportTableName = "exporttestDurability";
|
||||
Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);
|
||||
try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
|
||||
|
||||
// Insert some data
|
||||
Put put = new Put(ROW1);
|
||||
|
@ -689,6 +687,7 @@ public class TestImportExport {
|
|||
//Ensure that the count is 2 (only one version of key value is obtained)
|
||||
assertTrue(getCount(importTable, null) == 2);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, WALKey, WALEdit)} to
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -60,9 +61,11 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -101,7 +104,11 @@ public class TestAtomicOperation {
|
|||
@After
|
||||
public void teardown() throws IOException {
|
||||
if (region != null) {
|
||||
BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
|
||||
((HRegion)region).close();
|
||||
WAL wal = ((HRegion)region).getWAL();
|
||||
if (wal != null) wal.close();
|
||||
if (bc != null) bc.shutdown();
|
||||
region = null;
|
||||
}
|
||||
}
|
||||
|
@ -176,17 +183,15 @@ public class TestAtomicOperation {
|
|||
*/
|
||||
@Test
|
||||
public void testIncrementMultiThreads() throws IOException {
|
||||
|
||||
LOG.info("Starting test testIncrementMultiThreads");
|
||||
// run a with mixed column families (1 and 3 versions)
|
||||
initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
|
||||
|
||||
// create 100 threads, each will increment by its own quantity
|
||||
int numThreads = 100;
|
||||
// create 25 threads, each will increment by its own quantity
|
||||
int numThreads = 25;
|
||||
int incrementsPerThread = 1000;
|
||||
Incrementer[] all = new Incrementer[numThreads];
|
||||
int expectedTotal = 0;
|
||||
|
||||
// create all threads
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
all[i] = new Incrementer(region, i, i, incrementsPerThread);
|
||||
|
@ -203,13 +208,13 @@ public class TestAtomicOperation {
|
|||
try {
|
||||
all[i].join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Ignored", e);
|
||||
}
|
||||
}
|
||||
assertICV(row, fam1, qual1, expectedTotal);
|
||||
assertICV(row, fam1, qual2, expectedTotal*2);
|
||||
assertICV(row, fam2, qual3, expectedTotal*3);
|
||||
LOG.info("testIncrementMultiThreads successfully verified that total is " +
|
||||
expectedTotal);
|
||||
LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
|
||||
}
|
||||
|
||||
|
||||
|
@ -260,6 +265,7 @@ public class TestAtomicOperation {
|
|||
|
||||
public Incrementer(Region region,
|
||||
int threadNumber, int amount, int numIncrements) {
|
||||
super("incrementer." + threadNumber);
|
||||
this.region = region;
|
||||
this.numIncrements = numIncrements;
|
||||
this.amount = amount;
|
||||
|
@ -280,8 +286,15 @@ public class TestAtomicOperation {
|
|||
// verify: Make sure we only see completed increments
|
||||
Get g = new Get(row);
|
||||
Result result = region.get(g);
|
||||
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2)));
|
||||
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
|
||||
if (result != null) {
|
||||
assertTrue(result.getValue(fam1, qual1) != null);
|
||||
assertTrue(result.getValue(fam1, qual2) != null);
|
||||
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
|
||||
Bytes.toLong(result.getValue(fam1, qual2)));
|
||||
assertTrue(result.getValue(fam2, qual3) != null);
|
||||
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
|
||||
Bytes.toLong(result.getValue(fam2, qual3)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -359,7 +372,7 @@ public class TestAtomicOperation {
|
|||
// create 10 threads, each will alternate between adding and
|
||||
// removing a column
|
||||
int numThreads = 10;
|
||||
int opsPerThread = 500;
|
||||
int opsPerThread = 250;
|
||||
AtomicOperation[] all = new AtomicOperation[numThreads];
|
||||
|
||||
AtomicLong timeStamps = new AtomicLong(0);
|
||||
|
@ -451,7 +464,7 @@ public class TestAtomicOperation {
|
|||
// create 10 threads, each will alternate between adding and
|
||||
// removing a column
|
||||
int numThreads = 10;
|
||||
int opsPerThread = 500;
|
||||
int opsPerThread = 250;
|
||||
AtomicOperation[] all = new AtomicOperation[numThreads];
|
||||
|
||||
AtomicLong timeStamps = new AtomicLong(0);
|
||||
|
@ -571,14 +584,12 @@ public class TestAtomicOperation {
|
|||
*/
|
||||
@Test
|
||||
public void testPutAndCheckAndPutInParallel() throws Exception {
|
||||
|
||||
final String tableName = "testPutAndCheckAndPut";
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName))
|
||||
.addFamily(new HColumnDescriptor(family));
|
||||
final Region region = TEST_UTIL.createLocalHRegion(htd, null, null);
|
||||
|
||||
this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
|
||||
Put[] puts = new Put[1];
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
|
||||
|
@ -602,7 +613,6 @@ public class TestAtomicOperation {
|
|||
for (Cell keyValue : results) {
|
||||
assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class PutThread extends TestThread {
|
||||
|
|
Loading…
Reference in New Issue