HBASE-18500 Performance issue: Don't use BufferedMutator for HTable's put method

This commit is contained in:
Guanghao Zhang 2017-08-02 13:52:16 +08:00
parent 56a4fedda2
commit 0c16bb591b
25 changed files with 69 additions and 307 deletions

View File

@ -305,25 +305,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
};
}
/**
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
* not be called for production uses.
* If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
* will be called.
* @param writeBufferSize The max size of internal buffer where data is stored.
* @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
* if an I/O error occurs and there are too many retries.
* @throws java.io.InterruptedIOException if the I/O task is interrupted.
* @deprecated Going away when we drop public support for {@link HTable}.
*/
@Deprecated
public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
InterruptedIOException {
this.writeBufferSize = writeBufferSize;
if (currentWriteBufferSize.get() > writeBufferSize) {
flush();
}
}
/**
* {@inheritDoc}

View File

@ -107,9 +107,6 @@ public class HTable implements Table {
private final TableName tableName;
private final Configuration configuration;
private final ConnectionConfiguration connConfiguration;
@VisibleForTesting
volatile BufferedMutatorImpl mutator;
private final Object mutatorLock = new Object();
private boolean closed = false;
private final int scannerCaching;
private final long scannerMaxResultSize;
@ -120,7 +117,6 @@ public class HTable implements Table {
private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator;
private final long writeBufferSize;
/** The Async process for batch */
@VisibleForTesting
@ -194,7 +190,6 @@ public class HTable implements Table {
this.rpcTimeout = builder.rpcTimeout;
this.readRpcTimeout = builder.readRpcTimeout;
this.writeRpcTimeout = builder.writeRpcTimeout;
this.writeBufferSize = builder.writeBufferSize;
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
@ -203,31 +198,6 @@ public class HTable implements Table {
this.locator = new HRegionLocator(tableName, connection);
}
/**
* For internal testing. Uses Connection provided in {@code params}.
* @throws IOException
*/
@VisibleForTesting
protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
connection = conn;
this.tableName = mutator.getName();
this.configuration = connection.getConfiguration();
connConfiguration = connection.getConnectionConfiguration();
cleanupPoolOnClose = false;
this.mutator = mutator;
this.operationTimeout = connConfiguration.getOperationTimeout();
this.rpcTimeout = connConfiguration.getRpcTimeout();
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
this.writeBufferSize = connConfiguration.getWriteBufferSize();
this.rpcControllerFactory = null;
this.rpcCallerFactory = null;
this.pool = mutator.getPool();
this.locator = null;
}
/**
* @return maxKeyValueSize from configuration.
*/
@ -603,8 +573,21 @@ public class HTable implements Table {
*/
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
flushCommits();
validatePut(put);
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Void rpcCall() throws Exception {
MutateRequest request =
RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
put);
doMutate(request);
return null;
}
};
rpcCallerFactory.<Void> newCaller(this.writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -613,8 +596,15 @@ public class HTable implements Table {
*/
@Override
public void put(final List<Put> puts) throws IOException {
getBufferedMutator().mutate(puts);
flushCommits();
for (Put put : puts) {
validatePut(put);
}
Object[] results = new Object[puts.size()];
try {
batch(puts, results, writeRpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
/**
@ -947,17 +937,6 @@ public class HTable implements Table {
return results;
}
/**
* @throws IOException
*/
void flushCommits() throws IOException {
if (mutator == null) {
// nothing to flush if there's no mutator; don't bother creating one.
return;
}
getBufferedMutator().flush();
}
/**
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call. Queries are executed in parallel.
@ -980,11 +959,6 @@ public class HTable implements Table {
if (this.closed) {
return;
}
flushCommits();
if (mutator != null) {
mutator.close();
mutator = null;
}
if (cleanupPoolOnClose) {
this.pool.shutdown();
try {
@ -1022,37 +996,6 @@ public class HTable implements Table {
}
}
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
* The default value comes from the configuration parameter
* {@code hbase.client.write.buffer}.
* @return The size of the write buffer in bytes.
*/
@Override
public long getWriteBufferSize() {
if (mutator == null) {
return connConfiguration.getWriteBufferSize();
} else {
return mutator.getWriteBufferSize();
}
}
/**
* Sets the size of the buffer in bytes.
* <p>
* If the new size is less than the current amount of data in the
* write buffer, the buffer gets flushed.
* @param writeBufferSize The new write buffer size, in bytes.
* @throws IOException if a remote or network exception occurs.
*/
@Override
@Deprecated
public void setWriteBufferSize(long writeBufferSize) throws IOException {
getBufferedMutator();
mutator.setWriteBufferSize(writeBufferSize);
}
/**
* The pool is used for mutli requests for this HTable
* @return the pool used for mutli
@ -1154,9 +1097,6 @@ public class HTable implements Table {
@Deprecated
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
if (mutator != null) {
mutator.setOperationTimeout(operationTimeout);
}
}
@Override
@ -1186,9 +1126,6 @@ public class HTable implements Table {
@Deprecated
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
if (mutator != null) {
mutator.setRpcTimeout(writeRpcTimeout);
}
}
@Override
@ -1318,19 +1255,4 @@ public class HTable implements Table {
public RegionLocator getRegionLocator() {
return this.locator;
}
@VisibleForTesting
BufferedMutator getBufferedMutator() throws IOException {
if (mutator == null) {
synchronized (mutatorLock) {
if (mutator == null) {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize)
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
.opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout));
}
}
}
return mutator;
}
}

View File

@ -209,13 +209,8 @@ public interface Table extends Closeable {
/**
* Puts some data in the table, in batch.
* <p>
* This can be used for group commit, or for submitting user defined
* batches. The writeBuffer will be periodically inspected while the List
* is processed, so depending on the List size the writeBuffer may flush
* not at all, or more than once.
* @param puts The list of mutations to apply. The batch put is done by
* aggregating the iteration of the Puts over the write buffer
* at the client-side for a single RPC call.
* This can be used for group commit, or for submitting user defined batches.
* @param puts The list of mutations to apply.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
*/
@ -482,30 +477,6 @@ public interface Table extends Closeable {
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
final Batch.Callback<R> callback) throws ServiceException, Throwable;
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
* The default value comes from the configuration parameter
* {@code hbase.client.write.buffer}.
* @return The size of the write buffer in bytes.
* @deprecated as of 1.0.1 (should not have been in 1.0.0). Replaced by {@link BufferedMutator#getWriteBufferSize()}
*/
@Deprecated
long getWriteBufferSize();
/**
* Sets the size of the buffer in bytes.
* <p>
* If the new size is less than the current amount of data in the
* write buffer, the buffer gets flushed.
* @param writeBufferSize The new write buffer size, in bytes.
* @throws IOException if a remote or network exception occurs.
* @deprecated as of 1.0.1 (should not have been in 1.0.0). Replaced by {@link BufferedMutator} and
* {@link BufferedMutatorParams#writeBufferSize(long)}
*/
@Deprecated
void setWriteBufferSize(long writeBufferSize) throws IOException;
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all

View File

@ -56,12 +56,6 @@ public interface TableBuilder {
*/
TableBuilder setWriteRpcTimeout(int timeout);
/**
* Set the write buffer size which by default is specified by the
* {@code hbase.client.write.buffer} setting.
*/
TableBuilder setWriteBufferSize(long writeBufferSize);
/**
* Create the {@link Table} instance.
*/

View File

@ -36,8 +36,6 @@ abstract class TableBuilderBase implements TableBuilder {
protected int writeRpcTimeout;
protected long writeBufferSize;
TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
if (tableName == null) {
throw new IllegalArgumentException("Given table name is null");
@ -48,7 +46,6 @@ abstract class TableBuilderBase implements TableBuilder {
this.rpcTimeout = connConf.getRpcTimeout();
this.readRpcTimeout = connConf.getReadRpcTimeout();
this.writeRpcTimeout = connConf.getWriteRpcTimeout();
this.writeBufferSize = connConf.getWriteBufferSize();
}
@Override
@ -74,10 +71,4 @@ abstract class TableBuilderBase implements TableBuilder {
this.writeRpcTimeout = timeout;
return this;
}
@Override
public TableBuilder setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
}

View File

@ -79,7 +79,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({ClientTests.class, MediumTests.class})
public class TestAsyncProcess {
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
@ -258,8 +257,6 @@ public class TestAsyncProcess {
}
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
@ -650,10 +647,9 @@ public class TestAsyncProcess {
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
try (HTable ht = new HTable(conn, mutator)) {
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
ht.put(puts);
try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);) {
mutator.mutate(puts);
mutator.flush();
List<AsyncRequestFuture> reqs = ap.allReqs;
int actualSnReqCount = 0;
@ -1095,54 +1091,6 @@ public class TestAsyncProcess {
assertFalse(ap.service.isShutdown());
}
private void doHTableFailedPut(boolean bufferOn) throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
if (bufferOn) {
bufferParam.writeBufferSize(1024L * 1024L);
} else {
bufferParam.writeBufferSize(0L);
}
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
HTable ht = new HTable(conn, mutator);
Put put = createPut(1, false);
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
try {
ht.put(put);
if (bufferOn) {
ht.flushCommits();
}
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
if (someReqs.getResults().length == 0) continue;
Assert.assertTrue(ars == null);
ars = someReqs;
}
Assert.assertTrue(ars != null);
verifyResult(ars, false);
// This should not raise any exception, puts have been 'received' before by the catch.
ht.close();
}
@Test
public void testHTableFailedPutWithBuffer() throws Exception {
doHTableFailedPut(true);
}
@Test
public void testHTableFailedPutWithoutBuffer() throws Exception {
doHTableFailedPut(false);
}
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
@ -1193,10 +1141,7 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(CONF);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
HTable ht = new HTable(conn, mutator);
HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
ht.multiAp = new MyAsyncProcess(conn, CONF, false);
List<Put> puts = new ArrayList<>(7);
@ -1258,9 +1203,7 @@ public class TestAsyncProcess {
ClusterConnection conn = createHConnection();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
try (HTable ht = new HTable(conn, mutator)) {
try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
ht.multiAp = ap;
List<Get> gets = new LinkedList<>();
gets.add(new Get(DUMMY_BYTES_1));
@ -1350,9 +1293,7 @@ public class TestAsyncProcess {
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam, ap);
HTable ht = new HTable(con, mutator);
HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
ht.multiAp = ap;
ht.batch(gets, null);

View File

@ -821,16 +821,6 @@ public class RemoteHTable implements Table {
throw new IOException("atomicMutation not supported");
}
@Override
public long getWriteBufferSize() {
throw new UnsupportedOperationException("getWriteBufferSize not implemented");
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
throw new IOException("setWriteBufferSize not supported");
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,

View File

@ -270,16 +270,6 @@ public final class HTableWrapper implements Table {
table.mutateRow(rm);
}
@Override
public long getWriteBufferSize() {
return table.getWriteBufferSize();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
table.setWriteBufferSize(writeBufferSize);
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,

View File

@ -28,6 +28,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -200,7 +201,20 @@ public class AccessControlLists {
);
}
try {
t.put(p);
/**
* TODO: Use Table.put(Put) instead. This Table.put() happens within the RS. We are already in
* AccessController. Means already there was an RPC happened to server (Actual grant call from
* client side). At RpcServer we have a ThreadLocal where we keep the CallContext and inside
* that the current RPC called user info is set. The table on which put was called is created
* via the RegionCP env and that uses a special Connection. The normal RPC channel will be by
* passed here means there would have no further contact on to the RpcServer. So the
* ThreadLocal is never getting reset. We ran the new put as a super user (User.runAsLoginUser
* where the login user is the user who started RS process) but still as per the RPC context
* it is the old user. When AsyncProcess was used, the execute happen via another thread from
* pool and so old ThreadLocal variable is not accessible and so it looks as if no Rpc context
* and we were relying on the super user who starts the RS process.
*/
t.put(Collections.singletonList(p));
} finally {
t.close();
}

View File

@ -1231,6 +1231,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
static abstract class BufferedMutatorTest extends Test {
protected BufferedMutator mutator;
protected Table table;
BufferedMutatorTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
@ -1239,11 +1240,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
void onStartup() throws IOException {
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
this.table = connection.getTable(TableName.valueOf(opts.tableName));
}
@Override
void onTakedown() throws IOException {
mutator.close();
table.close();
}
}
@ -1465,9 +1468,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put);
if (opts.autoFlush) {
mutator.flush();
table.put(put);
} else {
mutator.mutate(put);
}
}
}
@ -1666,9 +1670,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put);
if (opts.autoFlush) {
mutator.flush();
table.put(put);
} else {
mutator.mutate(put);
}
}
}

View File

@ -93,7 +93,7 @@ public class TestClientPushback {
Configuration conf = UTIL.getConfiguration();
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getOnlineRegions(tableName).get(0);
@ -102,7 +102,8 @@ public class TestClientPushback {
// write some data
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value1"));
table.put(p);
mutator.mutate(p);
mutator.flush();
// get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
int load = (int) ((((HRegion) region).addAndGetMemstoreSize(new MemstoreSize(0, 0)) * 100)
@ -138,7 +139,6 @@ public class TestClientPushback {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
BufferedMutatorImpl mutator = ((HTable) table).mutator;
Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();

View File

@ -4068,8 +4068,8 @@ public class TestFromClientSide {
Put p = new Put(ROW);
p.addColumn(BAD_FAM, QUALIFIER, VAL);
table.put(p);
} catch (RetriesExhaustedWithDetailsException e) {
caughtNSCFE = e.getCause(0) instanceof NoSuchColumnFamilyException;
} catch (Exception e) {
caughtNSCFE = e instanceof NoSuchColumnFamilyException;
}
assertTrue("Should throw NoSuchColumnFamilyException", caughtNSCFE);
@ -4110,7 +4110,6 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10;
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY });
table.setWriteBufferSize(10);
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
byte[] row = Bytes.toBytes("row" + i);

View File

@ -85,7 +85,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
@ -442,7 +441,7 @@ public class TestHCM {
table.setOperationTimeout(30 * 1000);
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
Assert.fail("We expect an exception here");
} catch (RetriesExhaustedWithDetailsException e) {
} catch (SocketTimeoutException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.

View File

@ -266,7 +266,6 @@ public class TestMultiParallel {
// Load the data
LOG.info("get new table");
Table table = UTIL.getConnection().getTable(TEST_TABLE);
table.setWriteBufferSize(10 * 1024 * 1024);
LOG.info("constructPutRequests");
List<Put> puts = constructPutRequests();

View File

@ -141,12 +141,10 @@ public class TestServerBusyException {
public void run() {
try {
Put p = new Put(ROW);
p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0});
p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
table.put(p);
} catch (RetriesExhaustedWithDetailsException e) {
if (e.exceptions.get(0) instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (ServerTooBusyException e) {
getServerBusyException = 1;
} catch (IOException ignore) {
}
}

View File

@ -124,13 +124,8 @@ public class TestConstraint {
try {
table.put(put);
fail("This put should not have suceeded - AllFailConstraint was not run!");
} catch (RetriesExhaustedWithDetailsException e) {
List<Throwable> causes = e.getCauses();
assertEquals(
"More than one failure cause - should only be the failure constraint exception",
1, causes.size());
Throwable t = causes.get(0);
assertEquals(ConstraintException.class, t.getClass());
} catch (ConstraintException e) {
// expected
}
table.close();
}

View File

@ -148,7 +148,6 @@ public class TestHTableWrapper {
private void checkHTableInterfaceMethods() throws Exception {
checkConf();
checkNameAndDescriptor();
checkBufferSize();
checkExists();
checkAppend();
checkPutsAndDeletes();
@ -175,13 +174,6 @@ public class TestHTableWrapper {
assertEquals(table.getTableDescriptor(), hTableInterface.getTableDescriptor());
}
private void checkBufferSize() throws IOException {
long initialWriteBufferSize = hTableInterface.getWriteBufferSize();
hTableInterface.setWriteBufferSize(12345L);
assertEquals(12345L, hTableInterface.getWriteBufferSize());
hTableInterface.setWriteBufferSize(initialWriteBufferSize);
}
private void checkExists() throws IOException {
boolean ex = hTableInterface.exists(new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1));
assertTrue(ex);

View File

@ -302,16 +302,6 @@ public class RegionAsTable implements Table {
throw new UnsupportedOperationException();
}
@Override
public long getWriteBufferSize() {
throw new UnsupportedOperationException();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(MethodDescriptor
methodDescriptor, Message request,

View File

@ -558,7 +558,6 @@ public class TestMasterReplication {
Table[] htables = new Table[numClusters];
for (int i = 0; i < numClusters; i++) {
Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
htable.setWriteBufferSize(1024);
htables[i] = htable;
}
return htables;

View File

@ -140,11 +140,8 @@ public class TestMultiSlaveReplication {
utility2.getAdmin().createTable(table);
utility3.getAdmin().createTable(table);
Table htable1 = utility1.getConnection().getTable(tableName);
htable1.setWriteBufferSize(1024);
Table htable2 = utility2.getConnection().getTable(tableName);
htable2.setWriteBufferSize(1024);
Table htable3 = utility3.getConnection().getTable(tableName);
htable3.setWriteBufferSize(1024);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());

View File

@ -160,7 +160,6 @@ public class TestReplicationBase {
utility1.waitUntilAllRegionsAssigned(tableName);
utility2.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
htable1.setWriteBufferSize(1024);
htable2 = connection2.getTable(tableName);
}

View File

@ -426,7 +426,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
put.addColumn(famName, row, row);
puts.add(put);
}
htable1.setWriteBufferSize(1024);
// The puts will be iterated through and flushed only when the buffer
// size is reached.
htable1.put(puts);

View File

@ -198,15 +198,11 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
// Get HTable from Master
ht1Source = connection1.getTable(t1_su);
ht1Source.setWriteBufferSize(1024);
ht2Source = connection1.getTable(t2_su);
ht1Source.setWriteBufferSize(1024);
// Get HTable from Peer1
ht1TargetAtPeer1 = connection2.getTable(t1_su);
ht1TargetAtPeer1.setWriteBufferSize(1024);
ht2TargetAtPeer1 = connection2.getTable(t2_su);
ht2TargetAtPeer1.setWriteBufferSize(1024);
/**
* set M-S : Master: utility1 Slave1: utility2

View File

@ -64,9 +64,9 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.ServiceException;

View File

@ -145,6 +145,7 @@ public class TestNamespaceCommands extends SecureTestUtil {
User.createUserForTesting(conf, "user_group_write", new String[] { GROUP_WRITE });
// TODO: other table perms
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
UTIL.startMiniCluster();
// Wait for the ACL table to become available
UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME.getName(), 30 * 1000);