HBASE-14794 Cleanup TestAtomicOperation

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
      Fix a few missing table closes (This suite seems to leave loads of threads
      when test is done but have not figured the how yet).

    hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
      Fix some missing table closes. We were leaving around client
      resources.

    hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
      Close up WALs when done.
This commit is contained in:
stack 2015-11-10 18:57:04 -10:00
parent 37815cac9e
commit 7280ec09df
3 changed files with 268 additions and 250 deletions

View File

@ -174,64 +174,68 @@ public class TestMetaWithReplicas {
util.getHBaseAdmin().disableTable(TABLE); util.getHBaseAdmin().disableTable(TABLE);
util.getHBaseAdmin().deleteTable(TABLE); util.getHBaseAdmin().deleteTable(TABLE);
} }
Table htable = util.createTable(TABLE, FAMILIES, conf); ServerName master = null;
try (Connection c = ConnectionFactory.createConnection(util.getConfiguration());) {
util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); try (Table htable = util.createTable(TABLE, FAMILIES, conf);) {
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, util.getHBaseAdmin().flush(TableName.META_TABLE_NAME);
30000) * 6); Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
Connection c = ConnectionFactory.createConnection(util.getConfiguration()); 30000) * 6);
List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(zkw, c, List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(zkw, c,
TableName.valueOf(TABLE)); TableName.valueOf(TABLE));
HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0));
// Ensure that the primary server for test table is not the same one as the primary // Ensure that the primary server for test table is not the same one as the primary
// of the meta region since we will be killing the srv holding the meta's primary... // of the meta region since we will be killing the srv holding the meta's primary...
// We want to be able to write to the test table even when the meta is not present .. // We want to be able to write to the test table even when the meta is not present ..
// If the servers are the same, then move the test table's region out of the server // If the servers are the same, then move the test table's region out of the server
// to another random server // to another random server
if (hrl.getServerName().equals(primary)) { if (hrl.getServerName().equals(primary)) {
util.getHBaseAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes(), null); util.getHBaseAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes(), null);
// wait for the move to complete // wait for the move to complete
do { do {
Thread.sleep(10); Thread.sleep(10);
hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0));
} while (primary.equals(hrl.getServerName())); } while (primary.equals(hrl.getServerName()));
util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); util.getHBaseAdmin().flush(TableName.META_TABLE_NAME);
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
30000) * 3); 30000) * 3);
}
master = util.getHBaseClusterInterface().getClusterStatus().getMaster();
// kill the master so that regionserver recovery is not triggered at all
// for the meta server
util.getHBaseClusterInterface().stopMaster(master);
util.getHBaseClusterInterface().waitForMasterToStop(master, 60000);
if (!master.equals(primary)) {
util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
}
((ClusterConnection)c).clearRegionCache();
}
Get get = null;
Result r = null;
byte[] row = "test".getBytes();
try (Table htable = c.getTable(TableName.valueOf(TABLE));) {
Put put = new Put(row);
put.add("foo".getBytes(), row, row);
BufferedMutator m = c.getBufferedMutator(TableName.valueOf(TABLE));
m.mutate(put);
m.flush();
// Try to do a get of the row that was just put
get = new Get(row);
r = htable.get(get);
assertTrue(Arrays.equals(r.getRow(), row));
// now start back the killed servers and disable use of replicas. That would mean
// calls go to the primary
util.getHBaseClusterInterface().startMaster(master.getHostname(), 0);
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
((ClusterConnection)c).clearRegionCache();
}
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
try (Table htable = c.getTable(TableName.valueOf(TABLE));) {
r = htable.get(get);
assertTrue(Arrays.equals(r.getRow(), row));
}
} }
ServerName master = util.getHBaseClusterInterface().getClusterStatus().getMaster();
// kill the master so that regionserver recovery is not triggered at all
// for the meta server
util.getHBaseClusterInterface().stopMaster(master);
util.getHBaseClusterInterface().waitForMasterToStop(master, 60000);
if (!master.equals(primary)) {
util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
}
((ClusterConnection)c).clearRegionCache();
htable.close();
htable = c.getTable(TableName.valueOf(TABLE));
byte[] row = "test".getBytes();
Put put = new Put(row);
put.add("foo".getBytes(), row, row);
BufferedMutator m = c.getBufferedMutator(TableName.valueOf(TABLE));
m.mutate(put);
m.flush();
// Try to do a get of the row that was just put
Get get = new Get(row);
Result r = htable.get(get);
assertTrue(Arrays.equals(r.getRow(), row));
// now start back the killed servers and disable use of replicas. That would mean
// calls go to the primary
util.getHBaseClusterInterface().startMaster(master.getHostname(), 0);
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
((ClusterConnection)c).clearRegionCache();
htable.close();
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
htable = c.getTable(TableName.valueOf(TABLE));
r = htable.get(get);
assertTrue(Arrays.equals(r.getRow(), row));
} }
@Test @Test
@ -242,13 +246,15 @@ public class TestMetaWithReplicas {
TEST_UTIL.getHBaseAdmin().disableTable(TABLE); TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
TEST_UTIL.getHBaseAdmin().deleteTable(TABLE); TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
} }
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration()); try (Table htable =
byte[] row = "test".getBytes(); TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());) {
HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection); byte[] row = "test".getBytes();
// check that metalookup pool would get created HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection);
c.relocateRegion(TABLE, row); // check that metalookup pool would get created
ExecutorService ex = c.getCurrentMetaLookupPool(); c.relocateRegion(TABLE, row);
assert(ex != null); ExecutorService ex = c.getCurrentMetaLookupPool();
assert(ex != null);
}
} }
@Test @Test
@ -408,7 +414,6 @@ public class TestMetaWithReplicas {
@Test @Test
public void testHBaseFsckWithExcessMetaReplicas() throws Exception { public void testHBaseFsckWithExcessMetaReplicas() throws Exception {
HBaseFsck hbck = new HBaseFsck(TEST_UTIL.getConfiguration());
// Create a meta replica (this will be the 4th one) and assign it // Create a meta replica (this will be the 4th one) and assign it
HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica( HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
HRegionInfo.FIRST_META_REGIONINFO, 3); HRegionInfo.FIRST_META_REGIONINFO, 3);
@ -418,7 +423,7 @@ public class TestMetaWithReplicas {
TEST_UTIL.getMiniHBaseCluster().getMaster().assignRegion(h); TEST_UTIL.getMiniHBaseCluster().getMaster().assignRegion(h);
HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getHBaseAdmin(), h); HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getHBaseAdmin(), h);
// check that problem exists // check that problem exists
hbck = doFsck(TEST_UTIL.getConfiguration(), false); HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED}); assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED});
// fix the problem // fix the problem
hbck = doFsck(TEST_UTIL.getConfiguration(), true); hbck = doFsck(TEST_UTIL.getConfiguration(), true);

View File

@ -169,17 +169,18 @@ public class TestImportExport {
@Test @Test
public void testSimpleCase() throws Exception { public void testSimpleCase() throws Exception {
String EXPORT_TABLE = "exportSimpleCase"; String EXPORT_TABLE = "exportSimpleCase";
Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3); try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) {
Put p = new Put(ROW1); Put p = new Put(ROW1);
p.add(FAMILYA, QUAL, now, QUAL); p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now+1, QUAL); p.add(FAMILYA, QUAL, now+1, QUAL);
p.add(FAMILYA, QUAL, now+2, QUAL); p.add(FAMILYA, QUAL, now+2, QUAL);
t.put(p); t.put(p);
p = new Put(ROW2); p = new Put(ROW2);
p.add(FAMILYA, QUAL, now, QUAL); p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now+1, QUAL); p.add(FAMILYA, QUAL, now+1, QUAL);
p.add(FAMILYA, QUAL, now+2, QUAL); p.add(FAMILYA, QUAL, now+2, QUAL);
t.put(p); t.put(p);
}
String[] args = new String[] { String[] args = new String[] {
EXPORT_TABLE, EXPORT_TABLE,
@ -189,22 +190,23 @@ public class TestImportExport {
assertTrue(runExport(args)); assertTrue(runExport(args));
String IMPORT_TABLE = "importTableSimpleCase"; String IMPORT_TABLE = "importTableSimpleCase";
t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3); try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
args = new String[] { args = new String[] {
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
IMPORT_TABLE, IMPORT_TABLE,
FQ_OUTPUT_DIR FQ_OUTPUT_DIR
}; };
assertTrue(runImport(args)); assertTrue(runImport(args));
Get g = new Get(ROW1); Get g = new Get(ROW1);
g.setMaxVersions(); g.setMaxVersions();
Result r = t.get(g); Result r = t.get(g);
assertEquals(3, r.size()); assertEquals(3, r.size());
g = new Get(ROW2); g = new Get(ROW2);
g.setMaxVersions(); g.setMaxVersions();
r = t.get(g); r = t.get(g);
assertEquals(3, r.size()); assertEquals(3, r.size());
}
} }
/** /**
@ -238,23 +240,22 @@ public class TestImportExport {
FileSystem fs = FileSystem.get(UTIL.getConfiguration()); FileSystem fs = FileSystem.get(UTIL.getConfiguration());
fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
String IMPORT_TABLE = name; String IMPORT_TABLE = name;
Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3); try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
String[] args = new String[] { String[] args = new String[] {
"-Dhbase.import.version=0.94" , "-Dhbase.import.version=0.94" ,
IMPORT_TABLE, FQ_OUTPUT_DIR IMPORT_TABLE, FQ_OUTPUT_DIR
}; };
assertTrue(runImport(args)); assertTrue(runImport(args));
/* exportedTableIn94Format contains 5 rows
/* exportedTableIn94Format contains 5 rows ROW COLUMN+CELL
ROW COLUMN+CELL r1 column=f1:c1, timestamp=1383766761171, value=val1
r1 column=f1:c1, timestamp=1383766761171, value=val1 r2 column=f1:c1, timestamp=1383766771642, value=val2
r2 column=f1:c1, timestamp=1383766771642, value=val2 r3 column=f1:c1, timestamp=1383766777615, value=val3
r3 column=f1:c1, timestamp=1383766777615, value=val3 r4 column=f1:c1, timestamp=1383766785146, value=val4
r4 column=f1:c1, timestamp=1383766785146, value=val4 r5 column=f1:c1, timestamp=1383766791506, value=val5
r5 column=f1:c1, timestamp=1383766791506, value=val5 */
*/ assertEquals(5, UTIL.countRows(t));
assertEquals(5, UTIL.countRows(t)); }
t.close();
} }
/** /**
@ -268,30 +269,30 @@ public class TestImportExport {
.setMaxVersions(1) .setMaxVersions(1)
); );
UTIL.getHBaseAdmin().createTable(desc); UTIL.getHBaseAdmin().createTable(desc);
Table t = new HTable(UTIL.getConfiguration(), desc.getTableName()); try (Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());) {
Put p = new Put(ROW1);
p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now+1, QUAL);
p.add(FAMILYA, QUAL, now+2, QUAL);
p.add(FAMILYA, QUAL, now+3, QUAL);
p.add(FAMILYA, QUAL, now+4, QUAL);
t.put(p);
Put p = new Put(ROW1); String[] args = new String[] {
p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now+1, QUAL);
p.add(FAMILYA, QUAL, now+2, QUAL);
p.add(FAMILYA, QUAL, now+3, QUAL);
p.add(FAMILYA, QUAL, now+4, QUAL);
t.put(p);
String[] args = new String[] {
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
BATCH_TABLE, BATCH_TABLE,
FQ_OUTPUT_DIR FQ_OUTPUT_DIR
}; };
assertTrue(runExport(args)); assertTrue(runExport(args));
FileSystem fs = FileSystem.get(UTIL.getConfiguration()); FileSystem fs = FileSystem.get(UTIL.getConfiguration());
fs.delete(new Path(FQ_OUTPUT_DIR), true); fs.delete(new Path(FQ_OUTPUT_DIR), true);
t.close(); }
} }
@Test @Test
public void testWithDeletes() throws Exception { public void testWithDeletes() throws Exception {
String IMPORT_TABLE = "importWithDeletes";
String EXPORT_TABLE = "exportWithDeletes"; String EXPORT_TABLE = "exportWithDeletes";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE)); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
desc.addFamily(new HColumnDescriptor(FAMILYA) desc.addFamily(new HColumnDescriptor(FAMILYA)
@ -299,62 +300,59 @@ public class TestImportExport {
.setKeepDeletedCells(true) .setKeepDeletedCells(true)
); );
UTIL.getHBaseAdmin().createTable(desc); UTIL.getHBaseAdmin().createTable(desc);
Table t = new HTable(UTIL.getConfiguration(), desc.getTableName()); try (Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());) {
Put p = new Put(ROW1);
p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now+1, QUAL);
p.add(FAMILYA, QUAL, now+2, QUAL);
p.add(FAMILYA, QUAL, now+3, QUAL);
p.add(FAMILYA, QUAL, now+4, QUAL);
t.put(p);
Put p = new Put(ROW1); Delete d = new Delete(ROW1, now+3);
p.add(FAMILYA, QUAL, now, QUAL); t.delete(d);
p.add(FAMILYA, QUAL, now+1, QUAL); d = new Delete(ROW1);
p.add(FAMILYA, QUAL, now+2, QUAL); d.deleteColumns(FAMILYA, QUAL, now+2);
p.add(FAMILYA, QUAL, now+3, QUAL); t.delete(d);
p.add(FAMILYA, QUAL, now+4, QUAL);
t.put(p);
Delete d = new Delete(ROW1, now+3); String[] args = new String[] {
t.delete(d);
d = new Delete(ROW1);
d.deleteColumns(FAMILYA, QUAL, now+2);
t.delete(d);
String[] args = new String[] {
"-D" + Export.RAW_SCAN + "=true", "-D" + Export.RAW_SCAN + "=true",
EXPORT_TABLE, EXPORT_TABLE,
FQ_OUTPUT_DIR, FQ_OUTPUT_DIR,
"1000", // max number of key versions per key to export "1000", // max number of key versions per key to export
}; };
assertTrue(runExport(args)); assertTrue(runExport(args));
String IMPORT_TABLE = "importWithDeletes"; desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); desc.addFamily(new HColumnDescriptor(FAMILYA)
desc.addFamily(new HColumnDescriptor(FAMILYA)
.setMaxVersions(5) .setMaxVersions(5)
.setKeepDeletedCells(true) .setKeepDeletedCells(true)
); );
}
UTIL.getHBaseAdmin().createTable(desc); UTIL.getHBaseAdmin().createTable(desc);
t.close(); try (Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());) {
t = new HTable(UTIL.getConfiguration(), desc.getTableName()); String [] args = new String[] {
args = new String[] {
IMPORT_TABLE, IMPORT_TABLE,
FQ_OUTPUT_DIR FQ_OUTPUT_DIR
}; };
assertTrue(runImport(args)); assertTrue(runImport(args));
Scan s = new Scan(); Scan s = new Scan();
s.setMaxVersions(); s.setMaxVersions();
s.setRaw(true); s.setRaw(true);
ResultScanner scanner = t.getScanner(s); ResultScanner scanner = t.getScanner(s);
Result r = scanner.next(); Result r = scanner.next();
Cell[] res = r.rawCells(); Cell[] res = r.rawCells();
assertTrue(CellUtil.isDeleteFamily(res[0])); assertTrue(CellUtil.isDeleteFamily(res[0]));
assertEquals(now+4, res[1].getTimestamp()); assertEquals(now+4, res[1].getTimestamp());
assertEquals(now+3, res[2].getTimestamp()); assertEquals(now+3, res[2].getTimestamp());
assertTrue(CellUtil.isDelete(res[3])); assertTrue(CellUtil.isDelete(res[3]));
assertEquals(now+2, res[4].getTimestamp()); assertEquals(now+2, res[4].getTimestamp());
assertEquals(now+1, res[5].getTimestamp()); assertEquals(now+1, res[5].getTimestamp());
assertEquals(now, res[6].getTimestamp()); assertEquals(now, res[6].getTimestamp());
t.close(); }
} }
@Test @Test
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception { public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily"; String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
@ -365,15 +363,14 @@ public class TestImportExport {
); );
UTIL.getHBaseAdmin().createTable(desc); UTIL.getHBaseAdmin().createTable(desc);
HTable exportT = new HTable(UTIL.getConfiguration(), EXPORT_TABLE); HTable exportT = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
//Add first version of QUAL
Put p = new Put(ROW1);
p.add(FAMILYA, QUAL, now, QUAL);
exportT.put(p);
//Add first version of QUAL //Add Delete family marker
Put p = new Put(ROW1); Delete d = new Delete(ROW1, now+3);
p.add(FAMILYA, QUAL, now, QUAL); exportT.delete(d);
exportT.put(p);
//Add Delete family marker
Delete d = new Delete(ROW1, now+3);
exportT.delete(d);
//Add second version of QUAL //Add second version of QUAL
p = new Put(ROW1); p = new Put(ROW1);
@ -383,8 +380,8 @@ public class TestImportExport {
//Add second Delete family marker //Add second Delete family marker
d = new Delete(ROW1, now+7); d = new Delete(ROW1, now+7);
exportT.delete(d); exportT.delete(d);
String[] args = new String[] { String[] args = new String[] {
"-D" + Export.RAW_SCAN + "=true", "-D" + Export.RAW_SCAN + "=true",
EXPORT_TABLE, EXPORT_TABLE,
@ -400,7 +397,7 @@ public class TestImportExport {
.setKeepDeletedCells(true) .setKeepDeletedCells(true)
); );
UTIL.getHBaseAdmin().createTable(desc); UTIL.getHBaseAdmin().createTable(desc);
HTable importT = new HTable(UTIL.getConfiguration(), IMPORT_TABLE); HTable importT = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
args = new String[] { args = new String[] {
IMPORT_TABLE, IMPORT_TABLE,
@ -411,21 +408,17 @@ public class TestImportExport {
Scan s = new Scan(); Scan s = new Scan();
s.setMaxVersions(); s.setMaxVersions();
s.setRaw(true); s.setRaw(true);
ResultScanner importedTScanner = importT.getScanner(s); ResultScanner importedTScanner = importT.getScanner(s);
Result importedTResult = importedTScanner.next(); Result importedTResult = importedTScanner.next();
ResultScanner exportedTScanner = exportT.getScanner(s); ResultScanner exportedTScanner = exportT.getScanner(s);
Result exportedTResult = exportedTScanner.next(); Result exportedTResult = exportedTScanner.next();
try try {
{
Result.compareResults(exportedTResult, importedTResult); Result.compareResults(exportedTResult, importedTResult);
} } catch (Exception e) {
catch (Exception e) {
fail("Original and imported tables data comparision failed with error:"+e.getMessage()); fail("Original and imported tables data comparision failed with error:"+e.getMessage());
} } finally {
finally
{
exportT.close(); exportT.close();
importT.close(); importT.close();
} }
@ -469,7 +462,8 @@ public class TestImportExport {
Table importTable = new HTable(UTIL.getConfiguration(), desc.getTableName()); Table importTable = new HTable(UTIL.getConfiguration(), desc.getTableName());
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
FQ_OUTPUT_DIR,
"1000" }; "1000" };
assertTrue(runImport(args)); assertTrue(runImport(args));
@ -512,7 +506,7 @@ public class TestImportExport {
results.close(); results.close();
return count; return count;
} }
/** /**
* test main method. Import should print help and call System.exit * test main method. Import should print help and call System.exit
*/ */
@ -624,7 +618,7 @@ public class TestImportExport {
args.add("param2"); args.add("param2");
Import.addFilterAndArguments(configuration, FilterBase.class, args); Import.addFilterAndArguments(configuration, FilterBase.class, args);
assertEquals("org.apache.hadoop.hbase.filter.FilterBase", assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
configuration.get(Import.FILTER_CLASS_CONF_KEY)); configuration.get(Import.FILTER_CLASS_CONF_KEY));
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
} }
@ -633,20 +627,20 @@ public class TestImportExport {
public void testDurability() throws IOException, InterruptedException, ClassNotFoundException { public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
// Create an export table. // Create an export table.
String exportTableName = "exporttestDurability"; String exportTableName = "exporttestDurability";
Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3); try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
// Insert some data
Put put = new Put(ROW1);
put.add(FAMILYA, QUAL, now, QUAL);
put.add(FAMILYA, QUAL, now + 1, QUAL);
put.add(FAMILYA, QUAL, now + 2, QUAL);
exportTable.put(put);
// Insert some data put = new Put(ROW2);
Put put = new Put(ROW1); put.add(FAMILYA, QUAL, now, QUAL);
put.add(FAMILYA, QUAL, now, QUAL); put.add(FAMILYA, QUAL, now + 1, QUAL);
put.add(FAMILYA, QUAL, now + 1, QUAL); put.add(FAMILYA, QUAL, now + 2, QUAL);
put.add(FAMILYA, QUAL, now + 2, QUAL); exportTable.put(put);
exportTable.put(put); }
put = new Put(ROW2);
put.add(FAMILYA, QUAL, now, QUAL);
put.add(FAMILYA, QUAL, now + 1, QUAL);
put.add(FAMILYA, QUAL, now + 2, QUAL);
exportTable.put(put);
// Run the export // Run the export
String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
@ -654,39 +648,46 @@ public class TestImportExport {
// Create the table for import // Create the table for import
String importTableName = "importTestDurability1"; String importTableName = "importTestDurability1";
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); WAL wal = null;
HRegionInfo region = null;
TableWALActionListener walListener = null;
try (Table importTable =
UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);) {
// Register the wal listener for the import table // Register the wal listener for the import table
TableWALActionListener walListener = new TableWALActionListener(importTableName); walListener = new TableWALActionListener(importTableName);
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
wal.registerWALActionsListener(walListener); wal.registerWALActionsListener(walListener);
// Run the import with SKIP_WAL // Run the import with SKIP_WAL
args = args =
new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
importTableName, FQ_OUTPUT_DIR }; importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args)); assertTrue(runImport(args));
//Assert that the wal is not visisted //Assert that the wal is not visisted
assertTrue(!walListener.isWALVisited()); assertTrue(!walListener.isWALVisited());
//Ensure that the count is 2 (only one version of key value is obtained) //Ensure that the count is 2 (only one version of key value is obtained)
assertTrue(getCount(importTable, null) == 2); assertTrue(getCount(importTable, null) == 2);
// Run the import with the default durability option // Run the import with the default durability option
}
importTableName = "importTestDurability2"; importTableName = "importTestDurability2";
importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); try (Table importTable =
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);) {
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
walListener = new TableWALActionListener(importTableName); walListener = new TableWALActionListener(importTableName);
wal.registerWALActionsListener(walListener); wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR }; args = new String[] { importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args)); assertTrue(runImport(args));
//Assert that the wal is visisted //Assert that the wal is visisted
assertTrue(walListener.isWALVisited()); assertTrue(walListener.isWALVisited());
//Ensure that the count is 2 (only one version of key value is obtained) //Ensure that the count is 2 (only one version of key value is obtained)
assertTrue(getCount(importTable, null) == 2); assertTrue(getCount(importTable, null) == 2);
}
} }
/** /**
@ -712,5 +713,5 @@ public class TestImportExport {
public boolean isWALVisited() { public boolean isWALVisited() {
return isVisited; return isVisited;
} }
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -60,6 +61,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -92,21 +95,25 @@ public class TestAtomicOperation {
static final byte [] row = Bytes.toBytes("rowA"); static final byte [] row = Bytes.toBytes("rowA");
static final byte [] row2 = Bytes.toBytes("rowB"); static final byte [] row2 = Bytes.toBytes("rowB");
@Before @Before
public void setup() { public void setup() {
tableName = Bytes.toBytes(name.getMethodName()); tableName = Bytes.toBytes(name.getMethodName());
} }
@After @After
public void teardown() throws IOException { public void teardown() throws IOException {
if (region != null) { if (region != null) {
BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
((HRegion)region).close(); ((HRegion)region).close();
WAL wal = ((HRegion)region).getWAL();
if (wal != null) wal.close();
if (bc != null) bc.shutdown();
region = null; region = null;
} }
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// New tests that doesn't spin up a mini cluster but rather just test the // New tests that doesn't spin up a mini cluster but rather just test the
// individual code pieces in the HRegion. // individual code pieces in the HRegion.
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/** /**
@ -175,17 +182,15 @@ public class TestAtomicOperation {
*/ */
@Test @Test
public void testIncrementMultiThreads() throws IOException { public void testIncrementMultiThreads() throws IOException {
LOG.info("Starting test testIncrementMultiThreads"); LOG.info("Starting test testIncrementMultiThreads");
// run a with mixed column families (1 and 3 versions) // run a with mixed column families (1 and 3 versions)
initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
// create 100 threads, each will increment by its own quantity // create 25 threads, each will increment by its own quantity
int numThreads = 100; int numThreads = 25;
int incrementsPerThread = 1000; int incrementsPerThread = 1000;
Incrementer[] all = new Incrementer[numThreads]; Incrementer[] all = new Incrementer[numThreads];
int expectedTotal = 0; int expectedTotal = 0;
// create all threads // create all threads
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
all[i] = new Incrementer(region, i, i, incrementsPerThread); all[i] = new Incrementer(region, i, i, incrementsPerThread);
@ -202,13 +207,13 @@ public class TestAtomicOperation {
try { try {
all[i].join(); all[i].join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Ignored", e);
} }
} }
assertICV(row, fam1, qual1, expectedTotal); assertICV(row, fam1, qual1, expectedTotal);
assertICV(row, fam1, qual2, expectedTotal*2); assertICV(row, fam1, qual2, expectedTotal*2);
assertICV(row, fam2, qual3, expectedTotal*3); assertICV(row, fam2, qual3, expectedTotal*3);
LOG.info("testIncrementMultiThreads successfully verified that total is " + LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
expectedTotal);
} }
@ -259,6 +264,7 @@ public class TestAtomicOperation {
public Incrementer(Region region, public Incrementer(Region region,
int threadNumber, int amount, int numIncrements) { int threadNumber, int amount, int numIncrements) {
super("incrementer." + threadNumber);
this.region = region; this.region = region;
this.numIncrements = numIncrements; this.numIncrements = numIncrements;
this.amount = amount; this.amount = amount;
@ -267,7 +273,7 @@ public class TestAtomicOperation {
@Override @Override
public void run() { public void run() {
for (int i=0; i<numIncrements; i++) { for (int i = 0; i < numIncrements; i++) {
try { try {
Increment inc = new Increment(row); Increment inc = new Increment(row);
inc.addColumn(fam1, qual1, amount); inc.addColumn(fam1, qual1, amount);
@ -279,8 +285,15 @@ public class TestAtomicOperation {
// verify: Make sure we only see completed increments // verify: Make sure we only see completed increments
Get g = new Get(row); Get g = new Get(row);
Result result = region.get(g); Result result = region.get(g);
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); if (result != null) {
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); assertTrue(result.getValue(fam1, qual1) != null);
assertTrue(result.getValue(fam1, qual2) != null);
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
Bytes.toLong(result.getValue(fam1, qual2)));
assertTrue(result.getValue(fam2, qual3) != null);
assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
Bytes.toLong(result.getValue(fam2, qual3)));
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -316,8 +329,8 @@ public class TestAtomicOperation {
Get g = new Get(row); Get g = new Get(row);
Result result = region.get(g); Result result = region.get(g);
assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
failures.incrementAndGet(); failures.incrementAndGet();
@ -358,7 +371,7 @@ public class TestAtomicOperation {
// create 10 threads, each will alternate between adding and // create 10 threads, each will alternate between adding and
// removing a column // removing a column
int numThreads = 10; int numThreads = 10;
int opsPerThread = 500; int opsPerThread = 250;
AtomicOperation[] all = new AtomicOperation[numThreads]; AtomicOperation[] all = new AtomicOperation[numThreads];
AtomicLong timeStamps = new AtomicLong(0); AtomicLong timeStamps = new AtomicLong(0);
@ -450,7 +463,7 @@ public class TestAtomicOperation {
// create 10 threads, each will alternate between adding and // create 10 threads, each will alternate between adding and
// removing a column // removing a column
int numThreads = 10; int numThreads = 10;
int opsPerThread = 500; int opsPerThread = 250;
AtomicOperation[] all = new AtomicOperation[numThreads]; AtomicOperation[] all = new AtomicOperation[numThreads];
AtomicLong timeStamps = new AtomicLong(0); AtomicLong timeStamps = new AtomicLong(0);
@ -549,7 +562,7 @@ public class TestAtomicOperation {
this.failures = failures; this.failures = failures;
} }
} }
private static CountDownLatch latch = new CountDownLatch(1); private static CountDownLatch latch = new CountDownLatch(1);
private enum TestStep { private enum TestStep {
INIT, // initial put of 10 to set value of the cell INIT, // initial put of 10 to set value of the cell
@ -561,11 +574,11 @@ public class TestAtomicOperation {
} }
private static volatile TestStep testStep = TestStep.INIT; private static volatile TestStep testStep = TestStep.INIT;
private final String family = "f1"; private final String family = "f1";
/** /**
* Test written as a verifier for HBASE-7051, CheckAndPut should properly read * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
* MVCC. * MVCC.
* *
* Moved into TestAtomicOperation from its original location, TestHBase7051 * Moved into TestAtomicOperation from its original location, TestHBase7051
*/ */
@Test @Test
@ -581,7 +594,7 @@ public class TestAtomicOperation {
Put put = new Put(Bytes.toBytes("r1")); Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
puts[0] = put; puts[0] = put;
region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE); region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
MultithreadedTestUtil.TestContext ctx = MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf); new MultithreadedTestUtil.TestContext(conf);
@ -600,7 +613,6 @@ public class TestAtomicOperation {
for (Cell keyValue : results) { for (Cell keyValue : results) {
assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
} }
} }
private class PutThread extends TestThread { private class PutThread extends TestThread {
@ -656,7 +668,7 @@ public class TestAtomicOperation {
} }
return new WrappedRowLock(super.getRowLock(row, readLock)); return new WrappedRowLock(super.getRowLock(row, readLock));
} }
public class WrappedRowLock implements RowLock { public class WrappedRowLock implements RowLock {
private final RowLock rowLock; private final RowLock rowLock;