HBASE-13798 TestFromClientSide* don't close the Table

This commit is contained in:
Andor Molnár 2019-07-02 11:29:01 +02:00 committed by Peter Somogyi
parent 6205a6c8b3
commit 35521d4a2f
2 changed files with 4396 additions and 4281 deletions

View File

@ -89,6 +89,7 @@ public class TestFromClientSide3 {
private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
private final static HBaseTestingUtility TEST_UTIL private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility(); = new HBaseTestingUtility();
private static final int WAITTABLE_MILLIS = 10000;
private static byte[] FAMILY = Bytes.toBytes("testFamily"); private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static Random random = new Random(); private static Random random = new Random();
private static int SLAVES = 3; private static int SLAVES = 3;
@ -102,6 +103,7 @@ public class TestFromClientSide3 {
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
private TableName tableName;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -124,7 +126,7 @@ public class TestFromClientSide3 {
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
// Nothing to do. tableName = TableName.valueOf(name.getMethodName());
} }
/** /**
@ -183,86 +185,78 @@ public class TestFromClientSide3 {
} }
@Test @Test
public void testScanAfterDeletingSpecifiedRow() throws IOException { public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName()); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.build();
TEST_UTIL.getAdmin().createTable(desc);
byte[] row = Bytes.toBytes("SpecifiedRow"); byte[] row = Bytes.toBytes("SpecifiedRow");
byte[] value0 = Bytes.toBytes("value_0"); byte[] value0 = Bytes.toBytes("value_0");
byte[] value1 = Bytes.toBytes("value_1"); byte[] value1 = Bytes.toBytes("value_1");
try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
Put put = new Put(row); Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
t.put(put); table.put(put);
Delete d = new Delete(row); Delete d = new Delete(row);
t.delete(d); table.delete(d);
put = new Put(row); put = new Put(row);
put.addColumn(FAMILY, null, value0); put.addColumn(FAMILY, null, value0);
t.put(put); table.put(put);
put = new Put(row); put = new Put(row);
put.addColumn(FAMILY, null, value1); put.addColumn(FAMILY, null, value1);
t.put(put); table.put(put);
List<Cell> cells = toList(t.getScanner(new Scan())); List<Cell> cells = toList(table.getScanner(new Scan()));
assertEquals(1, cells.size()); assertEquals(1, cells.size());
assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
cells = toList(t.getScanner(new Scan().addFamily(FAMILY))); cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
assertEquals(1, cells.size()); assertEquals(1, cells.size());
assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
assertEquals(0, cells.size()); assertEquals(0, cells.size());
TEST_UTIL.getAdmin().flush(tableName); TEST_UTIL.getAdmin().flush(tableName);
cells = toList(t.getScanner(new Scan())); cells = toList(table.getScanner(new Scan()));
assertEquals(1, cells.size()); assertEquals(1, cells.size());
assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
cells = toList(t.getScanner(new Scan().addFamily(FAMILY))); cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
assertEquals(1, cells.size()); assertEquals(1, cells.size());
assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
assertEquals(0, cells.size()); assertEquals(0, cells.size());
} }
} }
@Test @Test
public void testScanAfterDeletingSpecifiedRowV2() throws IOException { public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName()); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.build();
TEST_UTIL.getAdmin().createTable(desc);
byte[] row = Bytes.toBytes("SpecifiedRow"); byte[] row = Bytes.toBytes("SpecifiedRow");
byte[] qual0 = Bytes.toBytes("qual0"); byte[] qual0 = Bytes.toBytes("qual0");
byte[] qual1 = Bytes.toBytes("qual1"); byte[] qual1 = Bytes.toBytes("qual1");
try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
Delete d = new Delete(row); Delete d = new Delete(row);
t.delete(d); table.delete(d);
Put put = new Put(row); Put put = new Put(row);
put.addColumn(FAMILY, null, VALUE); put.addColumn(FAMILY, null, VALUE);
t.put(put); table.put(put);
put = new Put(row); put = new Put(row);
put.addColumn(FAMILY, qual1, qual1); put.addColumn(FAMILY, qual1, qual1);
t.put(put); table.put(put);
put = new Put(row); put = new Put(row);
put.addColumn(FAMILY, qual0, qual0); put.addColumn(FAMILY, qual0, qual0);
t.put(put); table.put(put);
Result r = t.get(new Get(row)); Result r = table.get(new Get(row));
assertEquals(3, r.size()); assertEquals(3, r.size());
assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
TEST_UTIL.getAdmin().flush(tableName); TEST_UTIL.getAdmin().flush(tableName);
r = t.get(new Get(row)); r = table.get(new Get(row));
assertEquals(3, r.size()); assertEquals(3, r.size());
assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
@ -284,20 +278,21 @@ public class TestFromClientSide3 {
*/ */
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
final TableName tableName = TableName.valueOf(name.getMethodName()); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Admin admin = TEST_UTIL.getAdmin(); try (Admin admin = TEST_UTIL.getAdmin()) {
ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
// Create 3 store files. // Create 3 store files.
byte[] row = Bytes.toBytes(random.nextInt()); byte[] row = Bytes.toBytes(random.nextInt());
performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 100); performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100);
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
// Verify we have multiple store files. // Verify we have multiple store files.
HRegionLocation loc = locator.getRegionLocation(row, true); HRegionLocation loc = locator.getRegionLocation(row, true);
byte[] regionName = loc.getRegionInfo().getRegionName(); byte[] regionName = loc.getRegionInfo().getRegionName();
AdminProtos.AdminService.BlockingInterface server = connection.getAdmin(loc.getServerName()); AdminProtos.AdminService.BlockingInterface server =
connection.getAdmin(loc.getServerName());
assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
// Issue a compaction request // Issue a compaction request
@ -321,18 +316,19 @@ public class TestFromClientSide3 {
// change the compaction.min config option for this table to 5 // change the compaction.min config option for this table to 5
LOG.info("hbase.hstore.compaction.min should now be 5"); LOG.info("hbase.hstore.compaction.min should now be 5");
HTableDescriptor htd = new HTableDescriptor(hTable.getTableDescriptor()); HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor());
htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
admin.modifyTable(tableName, htd); admin.modifyTable(tableName, htd);
Pair<Integer, Integer> st; Pair<Integer, Integer> st = admin.getAlterStatus(tableName);
while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { while (null != st && st.getFirst() > 0) {
LOG.debug(st.getFirst() + " regions left to update"); LOG.debug(st.getFirst() + " regions left to update");
Thread.sleep(40); Thread.sleep(40);
st = admin.getAlterStatus(tableName);
} }
LOG.info("alter status finished"); LOG.info("alter status finished");
// Create 3 more store files. // Create 3 more store files.
performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 10); performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10);
// Issue a compaction request // Issue a compaction request
admin.compact(tableName); admin.compact(tableName);
@ -351,9 +347,11 @@ public class TestFromClientSide3 {
hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
htd.modifyFamily(hcd); htd.modifyFamily(hcd);
admin.modifyTable(tableName, htd); admin.modifyTable(tableName, htd);
while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { st = admin.getAlterStatus(tableName);
while (null != st && st.getFirst() > 0) {
LOG.debug(st.getFirst() + " regions left to update"); LOG.debug(st.getFirst() + " regions left to update");
Thread.sleep(40); Thread.sleep(40);
st = admin.getAlterStatus(tableName);
} }
LOG.info("alter status finished"); LOG.info("alter status finished");
@ -386,21 +384,24 @@ public class TestFromClientSide3 {
hcd.setValue("hbase.hstore.compaction.min", null); hcd.setValue("hbase.hstore.compaction.min", null);
htd.modifyFamily(hcd); htd.modifyFamily(hcd);
admin.modifyTable(tableName, htd); admin.modifyTable(tableName, htd);
while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { st = admin.getAlterStatus(tableName);
while (null != st && st.getFirst() > 0) {
LOG.debug(st.getFirst() + " regions left to update"); LOG.debug(st.getFirst() + " regions left to update");
Thread.sleep(40); Thread.sleep(40);
st = admin.getAlterStatus(tableName);
} }
LOG.info("alter status finished"); LOG.info("alter status finished");
assertNull(hTable.getTableDescriptor().getFamily(FAMILY).getValue( assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue(
"hbase.hstore.compaction.min")); "hbase.hstore.compaction.min"));
} }
} }
}
}
@Test @Test
public void testHTableBatchWithEmptyPut ()throws Exception { public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
new byte[][] { FAMILY }); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
try {
List actions = (List) new ArrayList(); List actions = (List) new ArrayList();
Object[] results = new Object[2]; Object[] results = new Object[2];
// create an empty Put // create an empty Put
@ -414,23 +415,20 @@ public class TestFromClientSide3 {
table.batch(actions, results); table.batch(actions, results);
fail("Empty Put should have failed the batch call"); fail("Empty Put should have failed the batch call");
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
} finally {
table.close();
} }
} }
// Test Table.batch with large amount of mutations against the same key. // Test Table.batch with large amount of mutations against the same key.
// It used to trigger read lock's "Maximum lock count exceeded" Error. // It used to trigger read lock's "Maximum lock count exceeded" Error.
@Test @Test
public void testHTableWithLargeBatch() throws Exception { public void testHTableWithLargeBatch() throws IOException, InterruptedException {
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
new byte[][] { FAMILY });
int sixtyFourK = 64 * 1024; int sixtyFourK = 64 * 1024;
try {
List actions = new ArrayList(); List actions = new ArrayList();
Object[] results = new Object[(sixtyFourK + 1) * 2]; Object[] results = new Object[(sixtyFourK + 1) * 2];
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
for (int i = 0; i < sixtyFourK + 1; i++) { for (int i = 0; i < sixtyFourK + 1; i++) {
Put put1 = new Put(ROW); Put put1 = new Put(ROW);
put1.addColumn(FAMILY, QUALIFIER, VALUE); put1.addColumn(FAMILY, QUALIFIER, VALUE);
@ -442,34 +440,33 @@ public class TestFromClientSide3 {
} }
table.batch(actions, results); table.batch(actions, results);
} finally {
table.close();
} }
} }
@Test @Test
public void testBatchWithRowMutation() throws Exception { public void testBatchWithRowMutation() throws Exception {
LOG.info("Starting testBatchWithRowMutation"); LOG.info("Starting testBatchWithRowMutation");
final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
byte [][] QUALIFIERS = new byte [][] { byte [][] QUALIFIERS = new byte [][] {
Bytes.toBytes("a"), Bytes.toBytes("b") Bytes.toBytes("a"), Bytes.toBytes("b")
}; };
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
RowMutations arm = RowMutations.of(Collections.singletonList( RowMutations arm = RowMutations.of(Collections.singletonList(
new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
Object[] batchResult = new Object[1]; Object[] batchResult = new Object[1];
t.batch(Arrays.asList(arm), batchResult); table.batch(Arrays.asList(arm), batchResult);
Get g = new Get(ROW); Get g = new Get(ROW);
Result r = t.get(g); Result r = table.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
arm = RowMutations.of(Arrays.asList( arm = RowMutations.of(Arrays.asList(
new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
t.batch(Arrays.asList(arm), batchResult); table.batch(Arrays.asList(arm), batchResult);
r = t.get(g); r = table.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
assertNull(r.getValue(FAMILY, QUALIFIERS[0])); assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
@ -477,7 +474,7 @@ public class TestFromClientSide3 {
try { try {
arm = RowMutations.of(Collections.singletonList( arm = RowMutations.of(Collections.singletonList(
new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
t.batch(Arrays.asList(arm), batchResult); table.batch(Arrays.asList(arm), batchResult);
fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
} catch (RetriesExhaustedWithDetailsException e) { } catch (RetriesExhaustedWithDetailsException e) {
String msg = e.getMessage(); String msg = e.getMessage();
@ -487,12 +484,12 @@ public class TestFromClientSide3 {
} }
@Test @Test
public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { public void testHTableExistsMethodSingleRegionSingleGet()
// Test with a single region table. throws IOException, InterruptedException {
Table table = TEST_UTIL.createTable( try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TableName.valueOf(name.getMethodName()), TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
new byte[][] { FAMILY });
// Test with a single region table.
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
@ -506,11 +503,13 @@ public class TestFromClientSide3 {
exist = table.exists(get); exist = table.exists(get);
assertTrue(exist); assertTrue(exist);
} }
}
@Test @Test
public void testHTableExistsMethodSingleRegionMultipleGets() throws Exception { public void testHTableExistsMethodSingleRegionMultipleGets()
Table table = TEST_UTIL.createTable(TableName.valueOf( throws IOException, InterruptedException {
name.getMethodName()), new byte[][] { FAMILY }); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
@ -524,12 +523,13 @@ public class TestFromClientSide3 {
assertTrue(results[0]); assertTrue(results[0]);
assertFalse(results[1]); assertFalse(results[1]);
} }
}
@Test @Test
public void testHTableExistsBeforeGet() throws Exception { public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
new byte[][] { FAMILY }); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
try {
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put); table.put(put);
@ -542,17 +542,15 @@ public class TestFromClientSide3 {
Result result = table.get(get); Result result = table.get(get);
assertEquals(false, result.isEmpty()); assertEquals(false, result.isEmpty());
assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
} finally {
table.close();
} }
} }
@Test @Test
public void testHTableExistsAllBeforeGet() throws Exception { public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
Table table = TEST_UTIL.createTable(
TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY });
try {
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put); table.put(put);
@ -575,16 +573,16 @@ public class TestFromClientSide3 {
assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
assertEquals(false, result[1].isEmpty()); assertEquals(false, result[1].isEmpty());
assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
} finally {
table.close();
} }
} }
@Test @Test
public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
Table table = TEST_UTIL.createTable( try (Table table = TEST_UTIL.createTable(
TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY }, tableName, new byte[][] { FAMILY },
1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
@ -598,12 +596,15 @@ public class TestFromClientSide3 {
exist = table.exists(get); exist = table.exists(get);
assertTrue(exist); assertTrue(exist);
} }
}
@Test @Test
public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
Table table = TEST_UTIL.createTable( try (Table table = TEST_UTIL.createTable(
TableName.valueOf(name.getMethodName()), tableName,
new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put); table.put(put);
@ -647,15 +648,13 @@ public class TestFromClientSide3 {
assertTrue(results[1]); assertTrue(results[1]);
assertFalse(results[2]); assertFalse(results[2]);
} }
}
@Test @Test
public void testGetEmptyRow() throws Exception { public void testGetEmptyRow() throws Exception {
//Create a table and put in 1 row //Create a table and put in 1 row
Admin admin = TEST_UTIL.getAdmin(); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(name.getMethodName()))); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
desc.addFamily(new HColumnDescriptor(FAMILY));
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
Put put = new Put(ROW_BYTES); Put put = new Put(ROW_BYTES);
put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
@ -674,7 +673,7 @@ public class TestFromClientSide3 {
assertTrue(res.isEmpty() == true); assertTrue(res.isEmpty() == true);
res = table.get(new Get(ROW_BYTES)); res = table.get(new Get(ROW_BYTES));
assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
table.close(); }
} }
@Test @Test
@ -685,10 +684,8 @@ public class TestFromClientSide3 {
@Test @Test
public void testPutWithPreBatchMutate() throws Exception { public void testPutWithPreBatchMutate() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
testPreBatchMutate(tableName, () -> { testPreBatchMutate(tableName, () -> {
try { try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
Table t = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
t.put(put); t.put(put);
@ -700,11 +697,9 @@ public class TestFromClientSide3 {
@Test @Test
public void testRowMutationsWithPreBatchMutate() throws Exception { public void testRowMutationsWithPreBatchMutate() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
testPreBatchMutate(tableName, () -> { testPreBatchMutate(tableName, () -> {
try { try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
RowMutations rm = new RowMutations(ROW, 1); RowMutations rm = new RowMutations(ROW, 1);
Table t = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE); put.addColumn(FAMILY, QUALIFIER, VALUE);
rm.add(put); rm.add(put);
@ -720,6 +715,8 @@ public class TestFromClientSide3 {
desc.addCoprocessor(WaitingForScanObserver.class.getName()); desc.addCoprocessor(WaitingForScanObserver.class.getName());
desc.addFamily(new HColumnDescriptor(FAMILY)); desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc); TEST_UTIL.getAdmin().createTable(desc);
// Don't use waitTableAvailable(), because the scanner will mess up the co-processor
ExecutorService service = Executors.newFixedThreadPool(2); ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(rn); service.execute(rn);
final List<Cell> cells = new ArrayList<>(); final List<Cell> cells = new ArrayList<>();
@ -727,13 +724,14 @@ public class TestFromClientSide3 {
try { try {
// waiting for update. // waiting for update.
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
Table t = TEST_UTIL.getConnection().getTable(tableName); try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
Scan scan = new Scan(); Scan scan = new Scan();
try (ResultScanner scanner = t.getScanner(scan)) { try (ResultScanner scanner = t.getScanner(scan)) {
for (Result r : scanner) { for (Result r : scanner) {
cells.addAll(Arrays.asList(r.rawCells())); cells.addAll(Arrays.asList(r.rawCells()));
} }
} }
}
} catch (IOException | InterruptedException ex) { } catch (IOException | InterruptedException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
@ -747,12 +745,13 @@ public class TestFromClientSide3 {
@Test @Test
public void testLockLeakWithDelta() throws Exception, Throwable { public void testLockLeakWithDelta() throws Exception, Throwable {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName); HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
desc.addFamily(new HColumnDescriptor(FAMILY)); desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc); TEST_UTIL.getAdmin().createTable(desc);
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
// new a connection for lower retry number. // new a connection for lower retry number.
Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
@ -782,7 +781,8 @@ public class TestFromClientSide3 {
}); });
appendService.shutdown(); appendService.shutdown();
appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); WaitingForMultiMutationsObserver observer =
find(tableName, WaitingForMultiMutationsObserver.class);
observer.latch.countDown(); observer.latch.countDown();
putService.shutdown(); putService.shutdown();
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@ -800,13 +800,14 @@ public class TestFromClientSide3 {
@Test @Test
public void testMultiRowMutations() throws Exception, Throwable { public void testMultiRowMutations() throws Exception, Throwable {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName); HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
desc.addFamily(new HColumnDescriptor(FAMILY)); desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc); TEST_UTIL.getAdmin().createTable(desc);
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
// new a connection for lower retry number. // new a connection for lower retry number.
Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
@ -839,9 +840,11 @@ public class TestFromClientSide3 {
MultiRowMutationProtos.MutateRowsRequest request MultiRowMutationProtos.MutateRowsRequest request
= MultiRowMutationProtos.MutateRowsRequest.newBuilder() = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1)) org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
.MutationType.PUT, put1))
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
.MutationType.PUT, put2))
.build(); .build();
table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
ROW, ROW, ROW, ROW,
@ -894,14 +897,9 @@ public class TestFromClientSide3 {
* @throws IOException * @throws IOException
*/ */
@Test @Test
public void testMVCCUsingMVCCPreAssign() throws IOException { public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName()); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
HTableDescriptor htd = new HTableDescriptor(tableName); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
htd.addFamily(fam);
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(htd);
Table table = admin.getConnection().getTable(TableName.valueOf(name.getMethodName()));
//put two row first to init the scanner //put two row first to init the scanner
Put put = new Put(Bytes.toBytes("0")); Put put = new Put(Bytes.toBytes("0"));
put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
@ -933,19 +931,19 @@ public class TestFromClientSide3 {
} }
// the new scanner should see all rows // the new scanner should see all rows
assertEquals(1001, rowNum); assertEquals(1001, rowNum);
}
} }
@Test @Test
public void testPutThenGetWithMultipleThreads() throws Exception { public void testPutThenGetWithMultipleThreads() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final int THREAD_NUM = 20; final int THREAD_NUM = 20;
final int ROUND_NUM = 10; final int ROUND_NUM = 10;
for (int round = 0; round < ROUND_NUM; round++) { for (int round = 0; round < ROUND_NUM; round++) {
ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
final AtomicInteger successCnt = new AtomicInteger(0); final AtomicInteger successCnt = new AtomicInteger(0);
Table ht = TEST_UTIL.createTable(tableName, FAMILY); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
for (int i = 0; i < THREAD_NUM; i++) { for (int i = 0; i < THREAD_NUM; i++) {
final int index = i; final int index = i;
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@ -982,12 +980,13 @@ public class TestFromClientSide3 {
t.join(); t.join();
} }
assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
ht.close(); }
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }
private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { private static void assertNoLocks(final TableName tableName)
throws IOException, InterruptedException {
HRegion region = (HRegion) find(tableName); HRegion region = (HRegion) find(tableName);
assertEquals(0, region.getLockedRows().size()); assertEquals(0, region.getLockedRows().size());
} }
@ -1065,13 +1064,13 @@ public class TestFromClientSide3 {
} }
@Test @Test
public void testScanWithBatchSizeReturnIncompleteCells() throws IOException { public void testScanWithBatchSizeReturnIncompleteCells()
TableName tableName = TableName.valueOf(name.getMethodName()); throws IOException, InterruptedException {
TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
.build(); .build();
try (Table table = TEST_UTIL.createTable(hd, null)) {
Table table = TEST_UTIL.createTable(hd, null); TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Put put = new Put(ROW); Put put = new Put(ROW);
put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
@ -1096,9 +1095,10 @@ public class TestFromClientSide3 {
try (ResultScanner scanner = table.getScanner(scan)) { try (ResultScanner scanner = table.getScanner(scan)) {
List<Result> list = new ArrayList<>(); List<Result> list = new ArrayList<>();
/* /*
* The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB;
* scan rpc should return a result with 3 cells, because reach the batch limit = 3; The * The second scan rpc should return a result with 3 cells, because reach the batch limit
* mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the * = 3;
* The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
* moreResultsInRegion also would be false. Finally, the client should collect all the cells * moreResultsInRegion also would be false. Finally, the client should collect all the cells
* into two result: 2+3 -> 3+2; * into two result: 2+3 -> 3+2;
*/ */
@ -1128,3 +1128,4 @@ public class TestFromClientSide3 {
} }
} }
} }
}