HBASE-17367 Make HTable#getBufferedMutator thread safe

This commit is contained in:
Yu Li 2017-01-23 18:23:24 +08:00
parent 07e0a30efa
commit ba4a926b62
2 changed files with 62 additions and 9 deletions

View File

@ -108,7 +108,8 @@ public class HTable implements Table {
private final Configuration configuration;
private final ConnectionConfiguration connConfiguration;
@VisibleForTesting
BufferedMutatorImpl mutator;
volatile BufferedMutatorImpl mutator;
private final Object mutatorLock = new Object();
private boolean closed = false;
private final int scannerCaching;
private final long scannerMaxResultSize;
@ -1333,14 +1334,14 @@ public class HTable implements Table {
@VisibleForTesting
BufferedMutator getBufferedMutator() throws IOException {
if (mutator == null) {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName)
.pool(pool)
.writeBufferSize(writeBufferSize)
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
.opertationTimeout(operationTimeout)
.rpcTimeout(writeRpcTimeout)
);
synchronized (mutatorLock) {
if (mutator == null) {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize)
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
.opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout));
}
}
}
return mutator;
}

View File

@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -749,6 +751,56 @@ public class TestFromClientSide3 {
}
@Test
public void testPutThenGetWithMultipleThreads() throws Exception {
TableName TABLE = TableName.valueOf("testParallelPutAndGet");
final int THREAD_NUM = 20;
final int ROUND_NUM = 10;
for (int round = 0; round < ROUND_NUM; round++) {
ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
final AtomicInteger successCnt = new AtomicInteger(0);
Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
for (int i = 0; i < THREAD_NUM; i++) {
final int index = i;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
final byte[] row = Bytes.toBytes("row-" + index);
final byte[] value = Bytes.toBytes("v" + index);
try {
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, value);
ht.put(put);
Get get = new Get(row);
Result result = ht.get(get);
byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
if (Bytes.equals(value, returnedValue)) {
successCnt.getAndIncrement();
} else {
LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
+ ", returned value: "
+ (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
}
} catch (Throwable e) {
// do nothing
}
}
});
threads.add(t);
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
ht.close();
TEST_UTIL.deleteTable(TABLE);
}
}
private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
HRegion region = (HRegion) find(tableName);
assertEquals(0, region.getLockedRows().size());