HBASE-18500 Performance issue: Don't use BufferedMutator for HTable's put method
This commit is contained in:
parent
679f34e881
commit
cabdbf181a
|
@ -305,25 +305,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
};
|
||||
}
|
||||
/**
|
||||
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
|
||||
* not be called for production uses.
|
||||
* If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
|
||||
* will be called.
|
||||
* @param writeBufferSize The max size of internal buffer where data is stored.
|
||||
* @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
|
||||
* if an I/O error occurs and there are too many retries.
|
||||
* @throws java.io.InterruptedIOException if the I/O task is interrupted.
|
||||
* @deprecated Going away when we drop public support for {@link HTable}.
|
||||
*/
|
||||
@Deprecated
|
||||
public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
|
||||
InterruptedIOException {
|
||||
this.writeBufferSize = writeBufferSize;
|
||||
if (currentWriteBufferSize.get() > writeBufferSize) {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
|
|
|
@ -107,9 +107,6 @@ public class HTable implements Table {
|
|||
private final TableName tableName;
|
||||
private final Configuration configuration;
|
||||
private final ConnectionConfiguration connConfiguration;
|
||||
@VisibleForTesting
|
||||
volatile BufferedMutatorImpl mutator;
|
||||
private final Object mutatorLock = new Object();
|
||||
private boolean closed = false;
|
||||
private final int scannerCaching;
|
||||
private final long scannerMaxResultSize;
|
||||
|
@ -120,7 +117,6 @@ public class HTable implements Table {
|
|||
private int writeRpcTimeout; // timeout for each write rpc request
|
||||
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
||||
private final HRegionLocator locator;
|
||||
private final long writeBufferSize;
|
||||
|
||||
/** The Async process for batch */
|
||||
@VisibleForTesting
|
||||
|
@ -194,7 +190,6 @@ public class HTable implements Table {
|
|||
this.rpcTimeout = builder.rpcTimeout;
|
||||
this.readRpcTimeout = builder.readRpcTimeout;
|
||||
this.writeRpcTimeout = builder.writeRpcTimeout;
|
||||
this.writeBufferSize = builder.writeBufferSize;
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
|
||||
|
@ -203,31 +198,6 @@ public class HTable implements Table {
|
|||
this.locator = new HRegionLocator(tableName, connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal testing. Uses Connection provided in {@code params}.
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
|
||||
connection = conn;
|
||||
this.tableName = mutator.getName();
|
||||
this.configuration = connection.getConfiguration();
|
||||
connConfiguration = connection.getConnectionConfiguration();
|
||||
cleanupPoolOnClose = false;
|
||||
this.mutator = mutator;
|
||||
this.operationTimeout = connConfiguration.getOperationTimeout();
|
||||
this.rpcTimeout = connConfiguration.getRpcTimeout();
|
||||
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
|
||||
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
this.writeBufferSize = connConfiguration.getWriteBufferSize();
|
||||
this.rpcControllerFactory = null;
|
||||
this.rpcCallerFactory = null;
|
||||
this.pool = mutator.getPool();
|
||||
this.locator = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return maxKeyValueSize from configuration.
|
||||
*/
|
||||
|
@ -603,8 +573,21 @@ public class HTable implements Table {
|
|||
*/
|
||||
@Override
|
||||
public void put(final Put put) throws IOException {
|
||||
getBufferedMutator().mutate(put);
|
||||
flushCommits();
|
||||
validatePut(put);
|
||||
ClientServiceCallable<Void> callable =
|
||||
new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
|
||||
this.rpcControllerFactory.newController(), put.getPriority()) {
|
||||
@Override
|
||||
protected Void rpcCall() throws Exception {
|
||||
MutateRequest request =
|
||||
RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
|
||||
put);
|
||||
doMutate(request);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
rpcCallerFactory.<Void> newCaller(this.writeRpcTimeout).callWithRetries(callable,
|
||||
this.operationTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -613,8 +596,15 @@ public class HTable implements Table {
|
|||
*/
|
||||
@Override
|
||||
public void put(final List<Put> puts) throws IOException {
|
||||
getBufferedMutator().mutate(puts);
|
||||
flushCommits();
|
||||
for (Put put : puts) {
|
||||
validatePut(put);
|
||||
}
|
||||
Object[] results = new Object[puts.size()];
|
||||
try {
|
||||
batch(puts, results, writeRpcTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -947,17 +937,6 @@ public class HTable implements Table {
|
|||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
void flushCommits() throws IOException {
|
||||
if (mutator == null) {
|
||||
// nothing to flush if there's no mutator; don't bother creating one.
|
||||
return;
|
||||
}
|
||||
getBufferedMutator().flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a mixed batch of Get, Put and Delete actions. All actions for a
|
||||
* RegionServer are forwarded in one RPC call. Queries are executed in parallel.
|
||||
|
@ -980,11 +959,6 @@ public class HTable implements Table {
|
|||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
flushCommits();
|
||||
if (mutator != null) {
|
||||
mutator.close();
|
||||
mutator = null;
|
||||
}
|
||||
if (cleanupPoolOnClose) {
|
||||
this.pool.shutdown();
|
||||
try {
|
||||
|
@ -1022,37 +996,6 @@ public class HTable implements Table {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum size in bytes of the write buffer for this HTable.
|
||||
* <p>
|
||||
* The default value comes from the configuration parameter
|
||||
* {@code hbase.client.write.buffer}.
|
||||
* @return The size of the write buffer in bytes.
|
||||
*/
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
if (mutator == null) {
|
||||
return connConfiguration.getWriteBufferSize();
|
||||
} else {
|
||||
return mutator.getWriteBufferSize();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the size of the buffer in bytes.
|
||||
* <p>
|
||||
* If the new size is less than the current amount of data in the
|
||||
* write buffer, the buffer gets flushed.
|
||||
* @param writeBufferSize The new write buffer size, in bytes.
|
||||
* @throws IOException if a remote or network exception occurs.
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public void setWriteBufferSize(long writeBufferSize) throws IOException {
|
||||
getBufferedMutator();
|
||||
mutator.setWriteBufferSize(writeBufferSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* The pool is used for mutli requests for this HTable
|
||||
* @return the pool used for mutli
|
||||
|
@ -1154,9 +1097,6 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public void setOperationTimeout(int operationTimeout) {
|
||||
this.operationTimeout = operationTimeout;
|
||||
if (mutator != null) {
|
||||
mutator.setOperationTimeout(operationTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1186,9 +1126,6 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public void setWriteRpcTimeout(int writeRpcTimeout) {
|
||||
this.writeRpcTimeout = writeRpcTimeout;
|
||||
if (mutator != null) {
|
||||
mutator.setRpcTimeout(writeRpcTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1318,19 +1255,4 @@ public class HTable implements Table {
|
|||
public RegionLocator getRegionLocator() {
|
||||
return this.locator;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
BufferedMutator getBufferedMutator() throws IOException {
|
||||
if (mutator == null) {
|
||||
synchronized (mutatorLock) {
|
||||
if (mutator == null) {
|
||||
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
|
||||
new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize)
|
||||
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
|
||||
.opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout));
|
||||
}
|
||||
}
|
||||
}
|
||||
return mutator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,13 +209,8 @@ public interface Table extends Closeable {
|
|||
/**
|
||||
* Puts some data in the table, in batch.
|
||||
* <p>
|
||||
* This can be used for group commit, or for submitting user defined
|
||||
* batches. The writeBuffer will be periodically inspected while the List
|
||||
* is processed, so depending on the List size the writeBuffer may flush
|
||||
* not at all, or more than once.
|
||||
* @param puts The list of mutations to apply. The batch put is done by
|
||||
* aggregating the iteration of the Puts over the write buffer
|
||||
* at the client-side for a single RPC call.
|
||||
* This can be used for group commit, or for submitting user defined batches.
|
||||
* @param puts The list of mutations to apply.
|
||||
* @throws IOException if a remote or network exception occurs.
|
||||
* @since 0.20.0
|
||||
*/
|
||||
|
@ -482,30 +477,6 @@ public interface Table extends Closeable {
|
|||
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
|
||||
final Batch.Callback<R> callback) throws ServiceException, Throwable;
|
||||
|
||||
/**
|
||||
* Returns the maximum size in bytes of the write buffer for this HTable.
|
||||
* <p>
|
||||
* The default value comes from the configuration parameter
|
||||
* {@code hbase.client.write.buffer}.
|
||||
* @return The size of the write buffer in bytes.
|
||||
* @deprecated as of 1.0.1 (should not have been in 1.0.0). Replaced by {@link BufferedMutator#getWriteBufferSize()}
|
||||
*/
|
||||
@Deprecated
|
||||
long getWriteBufferSize();
|
||||
|
||||
/**
|
||||
* Sets the size of the buffer in bytes.
|
||||
* <p>
|
||||
* If the new size is less than the current amount of data in the
|
||||
* write buffer, the buffer gets flushed.
|
||||
* @param writeBufferSize The new write buffer size, in bytes.
|
||||
* @throws IOException if a remote or network exception occurs.
|
||||
* @deprecated as of 1.0.1 (should not have been in 1.0.0). Replaced by {@link BufferedMutator} and
|
||||
* {@link BufferedMutatorParams#writeBufferSize(long)}
|
||||
*/
|
||||
@Deprecated
|
||||
void setWriteBufferSize(long writeBufferSize) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
|
||||
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
|
||||
|
|
|
@ -56,12 +56,6 @@ public interface TableBuilder {
|
|||
*/
|
||||
TableBuilder setWriteRpcTimeout(int timeout);
|
||||
|
||||
/**
|
||||
* Set the write buffer size which by default is specified by the
|
||||
* {@code hbase.client.write.buffer} setting.
|
||||
*/
|
||||
TableBuilder setWriteBufferSize(long writeBufferSize);
|
||||
|
||||
/**
|
||||
* Create the {@link Table} instance.
|
||||
*/
|
||||
|
|
|
@ -36,8 +36,6 @@ abstract class TableBuilderBase implements TableBuilder {
|
|||
|
||||
protected int writeRpcTimeout;
|
||||
|
||||
protected long writeBufferSize;
|
||||
|
||||
TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
|
||||
if (tableName == null) {
|
||||
throw new IllegalArgumentException("Given table name is null");
|
||||
|
@ -48,7 +46,6 @@ abstract class TableBuilderBase implements TableBuilder {
|
|||
this.rpcTimeout = connConf.getRpcTimeout();
|
||||
this.readRpcTimeout = connConf.getReadRpcTimeout();
|
||||
this.writeRpcTimeout = connConf.getWriteRpcTimeout();
|
||||
this.writeBufferSize = connConf.getWriteBufferSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,10 +71,4 @@ abstract class TableBuilderBase implements TableBuilder {
|
|||
this.writeRpcTimeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableBuilder setWriteBufferSize(long writeBufferSize) {
|
||||
this.writeBufferSize = writeBufferSize;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
@Category({ClientTests.class, MediumTests.class})
|
||||
public class TestAsyncProcess {
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
||||
|
@ -258,8 +257,6 @@ public class TestAsyncProcess {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
|
||||
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
|
||||
public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
|
||||
|
@ -650,10 +647,9 @@ public class TestAsyncProcess {
|
|||
|
||||
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
try (HTable ht = new HTable(conn, mutator)) {
|
||||
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
|
||||
ht.put(puts);
|
||||
try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);) {
|
||||
mutator.mutate(puts);
|
||||
mutator.flush();
|
||||
List<AsyncRequestFuture> reqs = ap.allReqs;
|
||||
|
||||
int actualSnReqCount = 0;
|
||||
|
@ -1095,54 +1091,6 @@ public class TestAsyncProcess {
|
|||
assertFalse(ap.service.isShutdown());
|
||||
}
|
||||
|
||||
private void doHTableFailedPut(boolean bufferOn) throws Exception {
|
||||
ClusterConnection conn = createHConnection();
|
||||
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
if (bufferOn) {
|
||||
bufferParam.writeBufferSize(1024L * 1024L);
|
||||
} else {
|
||||
bufferParam.writeBufferSize(0L);
|
||||
}
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
HTable ht = new HTable(conn, mutator);
|
||||
|
||||
Put put = createPut(1, false);
|
||||
|
||||
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
|
||||
try {
|
||||
ht.put(put);
|
||||
if (bufferOn) {
|
||||
ht.flushCommits();
|
||||
}
|
||||
Assert.fail();
|
||||
} catch (RetriesExhaustedException expected) {
|
||||
}
|
||||
Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
|
||||
// The table should have sent one request, maybe after multiple attempts
|
||||
AsyncRequestFuture ars = null;
|
||||
for (AsyncRequestFuture someReqs : ap.allReqs) {
|
||||
if (someReqs.getResults().length == 0) continue;
|
||||
Assert.assertTrue(ars == null);
|
||||
ars = someReqs;
|
||||
}
|
||||
Assert.assertTrue(ars != null);
|
||||
verifyResult(ars, false);
|
||||
|
||||
// This should not raise any exception, puts have been 'received' before by the catch.
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTableFailedPutWithBuffer() throws Exception {
|
||||
doHTableFailedPut(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTableFailedPutWithoutBuffer() throws Exception {
|
||||
doHTableFailedPut(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTableFailedPutAndNewPut() throws Exception {
|
||||
ClusterConnection conn = createHConnection();
|
||||
|
@ -1193,10 +1141,7 @@ public class TestAsyncProcess {
|
|||
@Test
|
||||
public void testBatch() throws IOException, InterruptedException {
|
||||
ClusterConnection conn = new MyConnectionImpl(CONF);
|
||||
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
HTable ht = new HTable(conn, mutator);
|
||||
HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
|
||||
ht.multiAp = new MyAsyncProcess(conn, CONF, false);
|
||||
|
||||
List<Put> puts = new ArrayList<>(7);
|
||||
|
@ -1258,9 +1203,7 @@ public class TestAsyncProcess {
|
|||
ClusterConnection conn = createHConnection();
|
||||
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
|
||||
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
try (HTable ht = new HTable(conn, mutator)) {
|
||||
try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
|
||||
ht.multiAp = ap;
|
||||
List<Get> gets = new LinkedList<>();
|
||||
gets.add(new Get(DUMMY_BYTES_1));
|
||||
|
@ -1350,9 +1293,7 @@ public class TestAsyncProcess {
|
|||
|
||||
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
|
||||
MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam, ap);
|
||||
HTable ht = new HTable(con, mutator);
|
||||
HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
|
||||
ht.multiAp = ap;
|
||||
ht.batch(gets, null);
|
||||
|
||||
|
|
|
@ -821,16 +821,6 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("atomicMutation not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
throw new UnsupportedOperationException("getWriteBufferSize not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteBufferSize(long writeBufferSize) throws IOException {
|
||||
throw new IOException("setWriteBufferSize not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R extends Message> Map<byte[], R> batchCoprocessorService(
|
||||
Descriptors.MethodDescriptor method, Message request,
|
||||
|
|
|
@ -270,16 +270,6 @@ public final class HTableWrapper implements Table {
|
|||
table.mutateRow(rm);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
return table.getWriteBufferSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteBufferSize(long writeBufferSize) throws IOException {
|
||||
table.setWriteBufferSize(writeBufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R extends Message> Map<byte[], R> batchCoprocessorService(
|
||||
MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.io.DataInputStream;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -200,7 +201,20 @@ public class AccessControlLists {
|
|||
);
|
||||
}
|
||||
try {
|
||||
t.put(p);
|
||||
/**
|
||||
* TODO: Use Table.put(Put) instead. This Table.put() happens within the RS. We are already in
|
||||
* AccessController. Means already there was an RPC happened to server (Actual grant call from
|
||||
* client side). At RpcServer we have a ThreadLocal where we keep the CallContext and inside
|
||||
* that the current RPC called user info is set. The table on which put was called is created
|
||||
* via the RegionCP env and that uses a special Connection. The normal RPC channel will be by
|
||||
* passed here means there would have no further contact on to the RpcServer. So the
|
||||
* ThreadLocal is never getting reset. We ran the new put as a super user (User.runAsLoginUser
|
||||
* where the login user is the user who started RS process) but still as per the RPC context
|
||||
* it is the old user. When AsyncProcess was used, the execute happen via another thread from
|
||||
* pool and so old ThreadLocal variable is not accessible and so it looks as if no Rpc context
|
||||
* and we were relying on the super user who starts the RS process.
|
||||
*/
|
||||
t.put(Collections.singletonList(p));
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
|
|
|
@ -1231,6 +1231,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
static abstract class BufferedMutatorTest extends Test {
|
||||
protected BufferedMutator mutator;
|
||||
protected Table table;
|
||||
|
||||
BufferedMutatorTest(Connection con, TestOptions options, Status status) {
|
||||
super(con, options, status);
|
||||
|
@ -1239,11 +1240,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
@Override
|
||||
void onStartup() throws IOException {
|
||||
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
|
||||
this.table = connection.getTable(TableName.valueOf(opts.tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
void onTakedown() throws IOException {
|
||||
mutator.close();
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1465,9 +1468,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||
mutator.mutate(put);
|
||||
if (opts.autoFlush) {
|
||||
mutator.flush();
|
||||
table.put(put);
|
||||
} else {
|
||||
mutator.mutate(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1666,9 +1670,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||
mutator.mutate(put);
|
||||
if (opts.autoFlush) {
|
||||
mutator.flush();
|
||||
table.put(put);
|
||||
} else {
|
||||
mutator.mutate(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TestClientPushback {
|
|||
Configuration conf = UTIL.getConfiguration();
|
||||
|
||||
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
|
||||
Table table = conn.getTable(tableName);
|
||||
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
|
||||
|
||||
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
|
||||
Region region = rs.getOnlineRegions(tableName).get(0);
|
||||
|
@ -102,7 +102,8 @@ public class TestClientPushback {
|
|||
// write some data
|
||||
Put p = new Put(Bytes.toBytes("row"));
|
||||
p.addColumn(family, qualifier, Bytes.toBytes("value1"));
|
||||
table.put(p);
|
||||
mutator.mutate(p);
|
||||
mutator.flush();
|
||||
|
||||
// get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
|
||||
int load = (int) ((((HRegion) region).addAndGetMemstoreSize(new MemstoreSize(0, 0)) * 100)
|
||||
|
@ -138,7 +139,6 @@ public class TestClientPushback {
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicLong endTime = new AtomicLong();
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
BufferedMutatorImpl mutator = ((HTable) table).mutator;
|
||||
Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
|
||||
endTime.set(EnvironmentEdgeManager.currentTime());
|
||||
latch.countDown();
|
||||
|
|
|
@ -4068,8 +4068,8 @@ public class TestFromClientSide {
|
|||
Put p = new Put(ROW);
|
||||
p.addColumn(BAD_FAM, QUALIFIER, VAL);
|
||||
table.put(p);
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
caughtNSCFE = e.getCause(0) instanceof NoSuchColumnFamilyException;
|
||||
} catch (Exception e) {
|
||||
caughtNSCFE = e instanceof NoSuchColumnFamilyException;
|
||||
}
|
||||
assertTrue("Should throw NoSuchColumnFamilyException", caughtNSCFE);
|
||||
|
||||
|
@ -4110,7 +4110,6 @@ public class TestFromClientSide {
|
|||
final int NB_BATCH_ROWS = 10;
|
||||
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
|
||||
new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY });
|
||||
table.setWriteBufferSize(10);
|
||||
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
|
||||
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + i);
|
||||
|
|
|
@ -85,7 +85,6 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
|
@ -442,7 +441,7 @@ public class TestHCM {
|
|||
table.setOperationTimeout(30 * 1000);
|
||||
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
|
||||
Assert.fail("We expect an exception here");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
} catch (SocketTimeoutException e) {
|
||||
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
|
||||
// in the general case you can expect the call to stop, but the exception may vary.
|
||||
// In this test however, we're sure that it will be a socket timeout.
|
||||
|
|
|
@ -266,7 +266,6 @@ public class TestMultiParallel {
|
|||
// Load the data
|
||||
LOG.info("get new table");
|
||||
Table table = UTIL.getConnection().getTable(TEST_TABLE);
|
||||
table.setWriteBufferSize(10 * 1024 * 1024);
|
||||
|
||||
LOG.info("constructPutRequests");
|
||||
List<Put> puts = constructPutRequests();
|
||||
|
|
|
@ -141,12 +141,10 @@ public class TestServerBusyException {
|
|||
public void run() {
|
||||
try {
|
||||
Put p = new Put(ROW);
|
||||
p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0});
|
||||
p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
|
||||
table.put(p);
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
if (e.exceptions.get(0) instanceof ServerTooBusyException) {
|
||||
getServerBusyException = 1;
|
||||
}
|
||||
} catch (ServerTooBusyException e) {
|
||||
getServerBusyException = 1;
|
||||
} catch (IOException ignore) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,13 +124,8 @@ public class TestConstraint {
|
|||
try {
|
||||
table.put(put);
|
||||
fail("This put should not have suceeded - AllFailConstraint was not run!");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
List<Throwable> causes = e.getCauses();
|
||||
assertEquals(
|
||||
"More than one failure cause - should only be the failure constraint exception",
|
||||
1, causes.size());
|
||||
Throwable t = causes.get(0);
|
||||
assertEquals(ConstraintException.class, t.getClass());
|
||||
} catch (ConstraintException e) {
|
||||
// expected
|
||||
}
|
||||
table.close();
|
||||
}
|
||||
|
|
|
@ -148,7 +148,6 @@ public class TestHTableWrapper {
|
|||
private void checkHTableInterfaceMethods() throws Exception {
|
||||
checkConf();
|
||||
checkNameAndDescriptor();
|
||||
checkBufferSize();
|
||||
checkExists();
|
||||
checkAppend();
|
||||
checkPutsAndDeletes();
|
||||
|
@ -175,13 +174,6 @@ public class TestHTableWrapper {
|
|||
assertEquals(table.getTableDescriptor(), hTableInterface.getTableDescriptor());
|
||||
}
|
||||
|
||||
private void checkBufferSize() throws IOException {
|
||||
long initialWriteBufferSize = hTableInterface.getWriteBufferSize();
|
||||
hTableInterface.setWriteBufferSize(12345L);
|
||||
assertEquals(12345L, hTableInterface.getWriteBufferSize());
|
||||
hTableInterface.setWriteBufferSize(initialWriteBufferSize);
|
||||
}
|
||||
|
||||
private void checkExists() throws IOException {
|
||||
boolean ex = hTableInterface.exists(new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1));
|
||||
assertTrue(ex);
|
||||
|
|
|
@ -302,16 +302,6 @@ public class RegionAsTable implements Table {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteBufferSize(long writeBufferSize) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R extends Message> Map<byte[], R> batchCoprocessorService(MethodDescriptor
|
||||
methodDescriptor, Message request,
|
||||
|
|
|
@ -558,7 +558,6 @@ public class TestMasterReplication {
|
|||
Table[] htables = new Table[numClusters];
|
||||
for (int i = 0; i < numClusters; i++) {
|
||||
Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
|
||||
htable.setWriteBufferSize(1024);
|
||||
htables[i] = htable;
|
||||
}
|
||||
return htables;
|
||||
|
|
|
@ -140,11 +140,8 @@ public class TestMultiSlaveReplication {
|
|||
utility2.getAdmin().createTable(table);
|
||||
utility3.getAdmin().createTable(table);
|
||||
Table htable1 = utility1.getConnection().getTable(tableName);
|
||||
htable1.setWriteBufferSize(1024);
|
||||
Table htable2 = utility2.getConnection().getTable(tableName);
|
||||
htable2.setWriteBufferSize(1024);
|
||||
Table htable3 = utility3.getConnection().getTable(tableName);
|
||||
htable3.setWriteBufferSize(1024);
|
||||
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility2.getClusterKey());
|
||||
|
|
|
@ -160,7 +160,6 @@ public class TestReplicationBase {
|
|||
utility1.waitUntilAllRegionsAssigned(tableName);
|
||||
utility2.waitUntilAllRegionsAssigned(tableName);
|
||||
htable1 = connection1.getTable(tableName);
|
||||
htable1.setWriteBufferSize(1024);
|
||||
htable2 = connection2.getTable(tableName);
|
||||
}
|
||||
|
||||
|
|
|
@ -426,7 +426,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
put.addColumn(famName, row, row);
|
||||
puts.add(put);
|
||||
}
|
||||
htable1.setWriteBufferSize(1024);
|
||||
// The puts will be iterated through and flushed only when the buffer
|
||||
// size is reached.
|
||||
htable1.put(puts);
|
||||
|
|
|
@ -198,15 +198,11 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
|
|||
|
||||
// Get HTable from Master
|
||||
ht1Source = connection1.getTable(t1_su);
|
||||
ht1Source.setWriteBufferSize(1024);
|
||||
ht2Source = connection1.getTable(t2_su);
|
||||
ht1Source.setWriteBufferSize(1024);
|
||||
|
||||
// Get HTable from Peer1
|
||||
ht1TargetAtPeer1 = connection2.getTable(t1_su);
|
||||
ht1TargetAtPeer1.setWriteBufferSize(1024);
|
||||
ht2TargetAtPeer1 = connection2.getTable(t2_su);
|
||||
ht2TargetAtPeer1.setWriteBufferSize(1024);
|
||||
|
||||
/**
|
||||
* set M-S : Master: utility1 Slave1: utility2
|
||||
|
|
|
@ -64,9 +64,9 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.access.Permission.Action;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
|
||||
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
|
|
@ -145,6 +145,7 @@ public class TestNamespaceCommands extends SecureTestUtil {
|
|||
User.createUserForTesting(conf, "user_group_write", new String[] { GROUP_WRITE });
|
||||
// TODO: other table perms
|
||||
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
UTIL.startMiniCluster();
|
||||
// Wait for the ACL table to become available
|
||||
UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME.getName(), 30 * 1000);
|
||||
|
|
Loading…
Reference in New Issue