HBASE-12728 buffered writes substantially less useful after removal of HTablePool (Solomon Duskis and Nick Dimiduk)

In our pre-1.0 API, HTable is considered a light-weight object that consumed by
a single thread at a time. The HTablePool class provided a means of sharing
multiple HTable instances across a number of threads. As an optimization,
HTable managed a "write buffer", accumulating edits and sending a "batch" all
at once. By default the batch was sent as the last step in invocations of
put(Put) and put(List<Put>). The user could disable the automatic flushing of
the write buffer, retaining edits locally and only sending the whole "batch"
once the write buffer has filled or when the flushCommits() method in invoked
explicitly. Explicit or implicit batch writing was controlled by the
setAutoFlushTo(boolean) method. A value of true (the default) had the write
buffer flushed at the completion of a call to put(Put) or put(List<Put>). A
value of false allowed for explicit buffer management. HTable also exposed the
buffer to consumers via getWriteBuffer().

The combination of HTable with setAutoFlushTo(false) and the HTablePool
provided a convenient mechanism by which multiple "Put-producing" threads could
share a common write buffer. Both HTablePool and HTable are deprecated, and
they are officially replaced in The new 1.0 API by Table and BufferedMutator.
Table, which replaces HTable, no longer exposes explicit write-buffer
management. Instead, explicit buffer management is exposed via BufferedMutator.
BufferedMutator is made safe for concurrent use. Where code would previously
retrieve and return HTables from an HTablePool, now that code creates and
shares a single BufferedMutator instance across all threads.
This commit is contained in:
Nick Dimiduk 2015-01-22 13:42:50 -08:00
parent e13b9938a1
commit ab18158e60
49 changed files with 1256 additions and 586 deletions

View File

@ -0,0 +1,129 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* <p>Used to communicate with a single HBase table similar to {@link Table} but meant for
* batched, asynchronous puts. Obtain an instance from a {@link Connection} and call
* {@link #close()} afterwards. Customizations can be applied to the {@code BufferedMutator} via
* the {@link BufferedMutatorParams}.
* </p>
*
* <p>Exception handling with asynchronously via the {@link BufferedMutator.ExceptionListener}.
* The default implementation is to throw the exception upon receipt. This behavior can be
* overridden with a custom implementation, provided as a parameter with
* {@link BufferedMutatorParams#listener(BufferedMutator.ExceptionListener)}.</p>
*
* <p>Map/Reduce jobs are good use cases for using {@code BufferedMutator}. Map/reduce jobs
* benefit from batching, but have no natural flush point. {@code BufferedMutator} receives the
* puts from the M/R job and will batch puts based on some heuristic, such as the accumulated size
* of the puts, and submit batches of puts asynchronously so that the M/R logic can continue
* without interruption.
* </p>
*
* <p>{@code BufferedMutator} can also be used on more exotic circumstances. Map/Reduce batch jobs
* will have a single {@code BufferedMutator} per thread. A single {@code BufferedMutator} can
* also be effectively used in high volume online systems to batch puts, with the caveat that
* extreme circumstances, such as JVM or machine failure, may cause some data loss.</p>
*
* <p>NOTE: This class replaces the functionality that used to be available via
* {@link HTableInterface#setAutoFlush(boolean)} set to {@code false}.
* </p>
*
* <p>See also the {@code BufferedMutatorExample} in the hbase-examples module.</p>
* @see ConnectionFactory
* @see Connection
* @since 1.0.0
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface BufferedMutator extends Closeable {
/**
* Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
*/
TableName getName();
/**
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
* <p>
* The reference returned is not a copy, so any change made to it will
* affect this instance.
*/
Configuration getConfiguration();
/**
* Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the
* wire as part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutation The data to send.
* @throws IOException if a remote or network exception occurs.
*/
void mutate(Mutation mutation) throws IOException;
/**
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the
* wire as part of a batch. There is no guarantee of sending entire content of {@code mutations}
* in a single batch; it will be broken up according to the write buffer capacity.
*
* @param mutations The data to send.
* @throws IOException if a remote or network exception occurs.
*/
void mutate(List<? extends Mutation> mutations) throws IOException;
/**
* Performs a {@link #flush()} and releases any resources held.
*
* @throws IOException if a remote or network exception occurs.
*/
@Override
void close() throws IOException;
/**
* Executes all the buffered, asynchronous {@link Mutation} operations and waits until they
* are done.
*
* @throws IOException if a remote or network exception occurs.
*/
void flush() throws IOException;
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
* The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
* @return The size of the write buffer in bytes.
*/
long getWriteBufferSize();
/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
interface ExceptionListener {
public void onException(RetriesExhaustedWithDetailsException exception,
BufferedMutator mutator) throws RetriesExhaustedWithDetailsException;
}
}

View File

@ -0,0 +1,258 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* <p>
* Used to communicate with a single HBase table similar to {@link HTable}
* but meant for batched, potentially asynchronous puts. Obtain an instance from
* a {@link Connection} and call {@link #close()} afterwards.
* </p>
*
* @see ConnectionFactory
* @see Connection
* @since 1.0.0
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class BufferedMutatorImpl implements BufferedMutator {
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
private final ExceptionListener listener;
protected ClusterConnection connection; // non-final so can be overridden in test
private final TableName tableName;
private volatile Configuration conf;
private List<Row> writeAsyncBuffer = new LinkedList<>();
private long writeBufferSize;
private final int maxKeyValueSize;
protected long currentWriteBufferSize = 0;
private boolean closed = false;
private final ExecutorService pool;
protected AsyncProcess ap; // non-final so can be overridden in test
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
if (conn == null || conn.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
this.tableName = params.getTableName();
this.connection = conn;
this.conf = connection.getConfiguration();
this.pool = params.getPool();
this.listener = params.getListener();
TableConfiguration tableConf = new TableConfiguration(conf);
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
}
@Override
public TableName getName() {
return tableName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
doMutate(m);
}
@Override
public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
for (Mutation m : ms) {
doMutate(m);
}
}
/**
* Add the put to the buffer. If the buffer is already too large, sends the buffer to the
* cluster.
*
* @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
* @throws InterruptedIOException if we were interrupted.
*/
private void doMutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
if (!(m instanceof Put) && !(m instanceof Delete)) {
throw new IllegalArgumentException("Pass a Delete or a Put");
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(m);
backgroundFlushCommits(true);
}
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
HTable.validatePut(put, maxKeyValueSize);
}
@Override
public synchronized void close() throws IOException {
if (this.closed) {
return;
}
try {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
this.pool.shutdown();
boolean terminated = false;
int loopCnt = 0;
do {
// wait until the pool has terminated
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
loopCnt += 1;
if (loopCnt >= 10) {
LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
break;
}
} while (!terminated);
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
} finally {
this.closed = true;
}
}
@Override
public synchronized void flush() throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
}
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
* the is an error (max retried reach from a previous flush or bad operation), it tries to send
* all operations in the buffer and sends an exception.
*
* @param synchronous - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
}
}
if (synchronous || ap.hasError()) {
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
if (error != null) {
if (listener == null) {
throw error;
} else {
this.listener.onException(error, this);
}
}
}
} finally {
currentWriteBufferSize = 0;
for (Row mut : writeAsyncBuffer) {
if (mut instanceof Mutation) {
currentWriteBufferSize += ((Mutation) mut).heapSize();
}
}
}
}
/**
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
* not be called for production uses.
* @deprecated Going away when we drop public support for {@link HTableInterface}.
*/
@Deprecated
public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
InterruptedIOException {
this.writeBufferSize = writeBufferSize;
if (currentWriteBufferSize > writeBufferSize) {
flush();
}
}
/**
* {@inheritDoc}
*/
@Override
public long getWriteBufferSize() {
return this.writeBufferSize;
}
/**
* This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ
* called from production uses.
* @deprecated Going away when we drop public support for {@link HTableInterface}.
Ó */
@Deprecated
public List<Row> getWriteBuffer() {
return this.writeAsyncBuffer;
}
}

View File

@ -0,0 +1,110 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Parameters for instantiating a {@link BufferedMutator}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class BufferedMutatorParams {
static final int UNSET = -1;
private final TableName tableName;
private long writeBufferSize = UNSET;
private int maxKeyValueSize = UNSET;
private ExecutorService pool = null;
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
BufferedMutator bufferedMutator)
throws RetriesExhaustedWithDetailsException {
throw exception;
}
};
public BufferedMutatorParams(TableName tableName) {
this.tableName = tableName;
}
public TableName getTableName() {
return tableName;
}
public long getWriteBufferSize() {
return writeBufferSize;
}
/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
* {@code hbase.client.write.buffer}.
*/
public BufferedMutatorParams writeBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
public int getMaxKeyValueSize() {
return maxKeyValueSize;
}
/**
* Override the maximum key-value size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
* {@code hbase.client.keyvalue.maxsize}.
*/
public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) {
this.maxKeyValueSize = maxKeyValueSize;
return this;
}
public ExecutorService getPool() {
return pool;
}
/**
* Override the default executor pool defined by the {@code hbase.htable.threads.*}
* configuration values.
*/
public BufferedMutatorParams pool(ExecutorService pool) {
this.pool = pool;
return this;
}
public BufferedMutator.ExceptionListener getListener() {
return listener;
}
/**
* Override the default error handler. Default handler simply rethrows the exception.
*/
public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) {
this.listener = listener;
return this;
}
}

View File

@ -98,6 +98,37 @@ public interface Connection extends Abortable, Closeable {
*/
Table getTable(TableName tableName, ExecutorService pool) throws IOException;
/**
* <p>
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
* {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will
* use the Connection's ExecutorService. This object can be used for long lived operations.
* </p>
* <p>
* The caller is responsible for calling {@link BufferedMutator#close()} on
* the returned {@link BufferedMutator} instance.
* </p>
* <p>
* This accessor will use the connection's ExecutorService and will throw an
* exception in the main thread when an asynchronous exception occurs.
*
* @param tableName the name of the table
*
* @return a {@link BufferedMutator} for the supplied tableName.
*/
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
/**
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
* {@link BufferedMutator} returned by this method is thread-safe. This object can be used for
* long lived table operations. The caller is responsible for calling
* {@link BufferedMutator#close()} on the returned {@link BufferedMutator} instance.
*
* @param params details on how to instantiate the {@code BufferedMutator}.
* @return a {@link BufferedMutator} for the supplied tableName.
*/
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
/**
* Retrieve a RegionLocator implementation to inspect region information on a table. The returned
* RegionLocator is not thread-safe, so a new instance should be created for each using thread.

View File

@ -109,6 +109,17 @@ abstract class ConnectionAdapter implements ClusterConnection {
return wrappedConnection.getTable(tableName, pool);
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params)
throws IOException {
return wrappedConnection.getBufferedMutator(params);
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
return wrappedConnection.getBufferedMutator(tableName);
}
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return wrappedConnection.getRegionLocator(tableName);

View File

@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -728,6 +728,28 @@ final class ConnectionManager {
return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
if (params.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null.");
}
if (params.getPool() == null) {
params.pool(HTable.getDefaultExecutor(getConfiguration()));
}
if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
params.writeBufferSize(tableConfig.getWriteBufferSize());
}
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(tableConfig.getMaxKeyValueSize());
}
return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) {
return getBufferedMutator(new BufferedMutatorParams(tableName));
}
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return new HRegionLocator(tableName, this);

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -112,10 +111,8 @@ public class HTable implements HTableInterface {
private final TableName tableName;
private volatile Configuration configuration;
private TableConfiguration tableConfiguration;
protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
private long writeBufferSize;
protected BufferedMutatorImpl mutator;
private boolean autoFlush = true;
protected long currentWriteBufferSize = 0 ;
private boolean closed = false;
protected int scannerCaching;
private ExecutorService pool; // For Multi & Scan
@ -125,8 +122,6 @@ public class HTable implements HTableInterface {
private Consistency defaultConsistency = Consistency.STRONG;
private HRegionLocator locator;
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap;
/** The Async process for batch */
protected AsyncProcess multiAp;
private RpcRetryingCallerFactory rpcCallerFactory;
@ -219,7 +214,7 @@ public class HTable implements HTableInterface {
// it also scales when new region servers are added.
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
pool.allowCoreThreadTimeOut(true);
return pool;
}
@ -323,15 +318,18 @@ public class HTable implements HTableInterface {
}
/**
* For internal testing.
* For internal testing. Uses Connection provided in {@code params}.
* @throws IOException
*/
@VisibleForTesting
protected HTable() throws IOException {
tableName = null;
tableConfiguration = new TableConfiguration();
protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
connection = conn;
tableName = params.getTableName();
tableConfiguration = new TableConfiguration(connection.getConfiguration());
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
// used from tests, don't trust the connection is real
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
}
/**
@ -351,7 +349,6 @@ public class HTable implements HTableInterface {
this.operationTimeout = tableName.isSystemTable() ?
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
this.writeBufferSize = tableConfiguration.getWriteBufferSize();
this.scannerCaching = tableConfiguration.getScannerCaching();
if (this.rpcCallerFactory == null) {
@ -362,8 +359,6 @@ public class HTable implements HTableInterface {
}
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true,
rpcControllerFactory);
multiAp = this.connection.getAsyncProcess();
this.locator = new HRegionLocator(getName(), connection);
}
@ -539,7 +534,7 @@ public class HTable implements HTableInterface {
*/
@Deprecated
public List<Row> getWriteBuffer() {
return writeAsyncBuffer;
return mutator == null ? null : mutator.getWriteBuffer();
}
/**
@ -1000,11 +995,11 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
* @throws IOException
*/
@Override
public void put(final Put put)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
doPut(put);
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
@ -1012,82 +1007,16 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
* @throws IOException
*/
@Override
public void put(final List<Put> puts)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
for (Put put : puts) {
doPut(put);
}
public void put(final List<Put> puts) throws IOException {
getBufferedMutator().mutate(puts);
if (autoFlush) {
flushCommits();
}
}
/**
* Add the put to the buffer. If the buffer is already too large, sends the buffer to the
* cluster.
* @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
* @throws InterruptedIOException if we were interrupted.
*/
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(put);
backgroundFlushCommits(true);
}
validatePut(put);
currentWriteBufferSize += put.heapSize();
writeAsyncBuffer.add(put);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer.
* If the is an error (max retried reach from a previous flush or bad operation), it tries to
* send all operations in the buffer and sends an exception.
* @param synchronous - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException, RetriesExhaustedWithDetailsException {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
}
}
if (synchronous || ap.hasError()) {
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
if (error != null) {
throw error;
}
}
} finally {
currentWriteBufferSize = 0;
for (Row mut : writeAsyncBuffer) {
if (mut instanceof Mutation) {
currentWriteBufferSize += ((Mutation) mut).heapSize();
}
}
}
}
/**
* {@inheritDoc}
*/
@ -1452,12 +1381,11 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
* @throws IOException
*/
@Override
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
public void flushCommits() throws IOException {
getBufferedMutator().flush();
}
/**
@ -1581,7 +1509,11 @@ public class HTable implements HTableInterface {
*/
@Override
public long getWriteBufferSize() {
return writeBufferSize;
if (mutator == null) {
return tableConfiguration.getWriteBufferSize();
} else {
return mutator.getWriteBufferSize();
}
}
/**
@ -1594,10 +1526,8 @@ public class HTable implements HTableInterface {
*/
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
this.writeBufferSize = writeBufferSize;
if(currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
getBufferedMutator();
mutator.setWriteBufferSize(writeBufferSize);
}
/**
@ -1914,4 +1844,17 @@ public class HTable implements HTableInterface {
public RegionLocator getRegionLocator() {
return this.locator;
}
@VisibleForTesting
BufferedMutator getBufferedMutator() throws IOException {
if (mutator == null) {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName)
.pool(pool)
.writeBufferSize(tableConfiguration.getWriteBufferSize())
.maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
);
}
return mutator;
}
}

View File

@ -67,8 +67,8 @@ public interface HTableInterface extends Table {
* Whether or not to enable 'auto-flush'.
* @deprecated in 0.96. When called with setAutoFlush(false), this function also
* set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
* Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
* {@link #setAutoFlushTo(boolean)} for all other cases.
* Replace it with setAutoFlush(false, false) if this is exactly what you want, though
* this is the method you want for most cases.
*/
@Deprecated
void setAutoFlush(boolean autoFlush);
@ -105,12 +105,68 @@ public interface HTableInterface extends Table {
* the value of this parameter is ignored and clearBufferOnFail is set to true.
* Setting clearBufferOnFail to false is deprecated since 0.96.
* @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
* {@link #setAutoFlushTo(boolean)}} instead.
* @see #flushCommits
* {@link #setAutoFlush(boolean)}} instead.
* @see BufferedMutator#flush()
*/
@Deprecated
void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail);
/**
* Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}.
* @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
* {@link #setAutoFlush(boolean)} instead, or better still, move on to {@link BufferedMutator}
*/
@Deprecated
void setAutoFlushTo(boolean autoFlush);
/**
* Tells whether or not 'auto-flush' is turned on.
*
* @return {@code true} if 'auto-flush' is enabled (default), meaning
* {@link Put} operations don't get buffered/delayed and are immediately
* executed.
* @deprecated as of 1.0.0. Replaced by {@link BufferedMutator}
*/
@Deprecated
boolean isAutoFlush();
/**
* Executes all the buffered {@link Put} operations.
* <p>
* This method gets called once automatically for every {@link Put} or batch
* of {@link Put}s (when <code>put(List<Put>)</code> is used) when
* {@link #isAutoFlush} is {@code true}.
* @throws IOException if a remote or network exception occurs.
* @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#flush()}
*/
@Deprecated
void flushCommits() throws IOException;
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
* The default value comes from the configuration parameter
* {@code hbase.client.write.buffer}.
* @return The size of the write buffer in bytes.
* @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#getWriteBufferSize()}
*/
@Deprecated
long getWriteBufferSize();
/**
* Sets the size of the buffer in bytes.
* <p>
* If the new size is less than the current amount of data in the
* write buffer, the buffer gets flushed.
* @param writeBufferSize The new write buffer size, in bytes.
* @throws IOException if a remote or network exception occurs.
* @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} and
* {@link BufferedMutatorParams#writeBufferSize(long)}
*/
@Deprecated
void setWriteBufferSize(long writeBufferSize) throws IOException;
/**
* Return the row that matches <i>row</i> exactly,
* or the one that immediately precedes it.

View File

@ -219,9 +219,7 @@ public interface Table extends Closeable {
/**
* Puts some data in the table.
* <p>
* If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
* until the internal buffer is full.
*
* @param put The data to put.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
@ -231,9 +229,6 @@ public interface Table extends Closeable {
/**
* Puts some data in the table, in batch.
* <p>
* If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
* until the internal buffer is full.
* <p>
* This can be used for group commit, or for submitting user defined
* batches. The writeBuffer will be periodically inspected while the List
* is processed, so depending on the List size the writeBuffer may flush
@ -497,30 +492,6 @@ public interface Table extends Closeable {
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
final Batch.Callback<R> callback) throws ServiceException, Throwable;
/**
* Tells whether or not 'auto-flush' is turned on.
*
* @return {@code true} if 'auto-flush' is enabled (default), meaning
* {@link Put} operations don't get buffered/delayed and are immediately
* executed.
*/
boolean isAutoFlush();
/**
* Executes all the buffered {@link Put} operations.
* <p>
* This method gets called once automatically for every {@link Put} or batch
* of {@link Put}s (when <code>put(List<Put>)</code> is used) when
* {@link #isAutoFlush} is {@code true}.
* @throws IOException if a remote or network exception occurs.
*/
void flushCommits() throws IOException;
/**
* Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}
*/
void setAutoFlushTo(boolean autoFlush);
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
@ -540,7 +511,6 @@ public interface Table extends Closeable {
*/
void setWriteBufferSize(long writeBufferSize) throws IOException;
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all

View File

@ -28,20 +28,18 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class TableConfiguration {
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
private final long writeBufferSize;
private final int metaOperationTimeout;
private final int operationTimeout;
private final int scannerCaching;
private final int primaryCallTimeoutMicroSecond;
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
/**
@ -49,7 +47,7 @@ public class TableConfiguration {
* @param conf Configuration object
*/
TableConfiguration(Configuration conf) {
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.metaOperationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
@ -70,7 +68,7 @@ public class TableConfiguration {
this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
}
/**
@ -80,14 +78,14 @@ public class TableConfiguration {
*/
@VisibleForTesting
protected TableConfiguration() {
this.writeBufferSize = 2097152;
this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
this.primaryCallTimeoutMicroSecond = 10000;
this.replicaCallTimeoutMicroSecondScan = 1000000;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = -1;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
}
public long getWriteBufferSize() {

View File

@ -155,8 +155,8 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@SuppressWarnings("unused") boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
@Override
@ -644,26 +644,27 @@ public class TestAsyncProcess {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
Mockito.when(hc.getConfiguration()).thenReturn(conf);
return hc;
}
@Test
public void testHTablePutSuccess() throws Exception {
HTable ht = Mockito.mock(HTable.class);
BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
Put put = createPut(1, true);
Assert.assertEquals(0, ht.getWriteBufferSize());
ht.put(put);
ht.mutate(put);
Assert.assertEquals(0, ht.getWriteBufferSize());
}
private void doHTableFailedPut(boolean bufferOn) throws Exception {
HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap;
ht.setAutoFlushTo(true);
ClusterConnection conn = createHConnection();
HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
ht.mutator.ap = ap;
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
} else {
@ -672,7 +673,7 @@ public class TestAsyncProcess {
Put put = createPut(1, false);
Assert.assertEquals(0L, ht.currentWriteBufferSize);
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
try {
ht.put(put);
if (bufferOn) {
@ -681,7 +682,7 @@ public class TestAsyncProcess {
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals(0L, ht.currentWriteBufferSize);
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
@ -708,14 +709,14 @@ public class TestAsyncProcess {
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap;
ht.setAutoFlushTo(false);
ht.setWriteBufferSize(0);
ClusterConnection conn = createHConnection();
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
mutator.ap = ap;
Put p = createPut(1, false);
ht.put(p);
mutator.mutate(p);
ap.waitUntilDone(); // Let's do all the retries.
@ -725,13 +726,13 @@ public class TestAsyncProcess {
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
Assert.assertEquals(0, mutator.getWriteBuffer().size());
try {
ht.put(p);
mutator.mutate(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
}
@ -762,9 +763,9 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl(conf);
ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
ClusterConnection conn = new MyConnectionImpl(conf);
HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
ht.multiAp = new MyAsyncProcess(conn, conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@ -793,26 +794,24 @@ public class TestAsyncProcess {
@Test
public void testErrorsServers() throws IOException {
HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
ClusterConnection conn = new MyConnectionImpl(configuration);
BufferedMutatorImpl mutator =
new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
ht.connection = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
ht.ap = ap;
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
mutator.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
ht.ap.serverTrackerTimeout = 1;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
mutator.ap.serverTrackerTimeout = 1;
Put p = createPut(1, false);
ht.setAutoFlushTo(false);
ht.put(p);
mutator.mutate(p);
try {
ht.flushCommits();
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
@ -822,19 +821,18 @@ public class TestAsyncProcess {
@Test
public void testGlobalErrors() throws IOException {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl(conf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
ht.ap = ap;
ClusterConnection conn = new MyConnectionImpl(conf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
mutator.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
ht.setAutoFlushTo(false);
ht.put(p);
mutator.mutate(p);
try {
ht.flushCommits();
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
@ -861,13 +859,12 @@ public class TestAsyncProcess {
gets.add(get);
}
HTable ht = new HTable();
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
ht.connection = con;
HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
ht.multiAp = ap;
ht.batch(gets);
ht.batch(gets, new Object[gets.size()]);
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());

View File

@ -711,26 +711,38 @@ public class TestClientNoCluster extends Configured implements Tool {
* @throws IOException
*/
static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE));
table.setAutoFlushTo(false);
long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
long startTime = System.currentTimeMillis();
final int printInterval = 100000;
Random rd = new Random(id);
boolean get = c.getBoolean("hbase.test.do.gets", false);
try {
TableName tableName = TableName.valueOf(BIG_USER_TABLE);
if (get) {
try (Table table = sharedConnection.getTable(tableName)){
Stopwatch stopWatch = new Stopwatch();
stopWatch.start();
for (int i = 0; i < namespaceSpan; i++) {
byte [] b = format(rd.nextLong());
if (get){
Get g = new Get(b);
table.get(g);
if (i % printInterval == 0) {
LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
stopWatch.reset();
stopWatch.start();
}
}
LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
(System.currentTimeMillis() - startTime) + "ms");
}
} else {
try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
Stopwatch stopWatch = new Stopwatch();
stopWatch.start();
for (int i = 0; i < namespaceSpan; i++) {
byte [] b = format(rd.nextLong());
Put p = new Put(b);
p.add(HConstants.CATALOG_FAMILY, b, b);
table.put(p);
}
mutator.mutate(p);
if (i % printInterval == 0) {
LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
stopWatch.reset();
@ -739,8 +751,7 @@ public class TestClientNoCluster extends Configured implements Tool {
}
LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
(System.currentTimeMillis() - startTime) + "ms");
} finally {
table.close();
}
}
}

View File

@ -0,0 +1,119 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.example;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* An example of using the {@link BufferedMutator} interface.
*/
public class BufferedMutatorExample extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(BufferedMutatorExample.class);
private static final int POOL_SIZE = 10;
private static final int TASK_COUNT = 100;
private static final TableName TABLE = TableName.valueOf("foo");
private static final byte[] FAMILY = Bytes.toBytes("f");
@Override
public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
/** a callback invoked when an asynchronous write fails. */
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
LOG.info("Failed to sent put " + e.getRow(i) + ".");
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
.listener(listener);
//
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
//
try (final Connection conn = ConnectionFactory.createConnection(getConf());
final BufferedMutator mutator = conn.getBufferedMutator(params)) {
/** worker pool that operates on BufferedTable instances */
final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE);
List<Future<Void>> futures = new ArrayList<>(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
futures.add(workerPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
//
// step 2: each worker sends edits to the shared BufferedMutator instance. They all use
// the same backing buffer, call-back "listener", and RPC executor pool.
//
Put p = new Put(Bytes.toBytes("someRow"));
p.add(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
mutator.mutate(p);
// do work... maybe you want to call mutator.flush() after many edits to ensure any of
// this worker's edits are sent before exiting the Callable
return null;
}
}));
}
//
// step 3: clean up the worker pool, shut down.
//
for (Future<Void> f : futures) {
f.get(5, TimeUnit.MINUTES);
}
workerPool.shutdown();
} catch (IOException e) {
// exception while creating/destroying Connection or BufferedMutator
LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
} // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is
// invoked from here.
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new BufferedMutatorExample(), args);
}
}

View File

@ -18,7 +18,18 @@
package org.apache.hadoop.hbase.test;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@ -40,6 +51,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@ -87,17 +100,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Sets;
/**
* This is an integration test borrowed from goraci, written by Keith Turner,
@ -340,7 +343,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
byte[] id;
long count = 0;
int i;
Table table;
BufferedMutator mutator;
Connection connection;
long numNodes;
long wrap;
@ -363,14 +366,14 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
protected void instantiateHTable() throws IOException {
table = connection.getTable(getTableName(connection.getConfiguration()));
table.setAutoFlushTo(false);
table.setWriteBufferSize(4 * 1024 * 1024);
mutator = connection.getBufferedMutator(
new BufferedMutatorParams(getTableName(connection.getConfiguration()))
.writeBufferSize(4 * 1024 * 1024));
}
@Override
protected void cleanup(Context context) throws IOException ,InterruptedException {
table.close();
mutator.close();
connection.close();
}
@ -421,7 +424,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (id != null) {
put.add(FAMILY_NAME, COLUMN_CLIENT, id);
}
table.put(put);
mutator.mutate(put);
if (i % 1000 == 0) {
// Tickle progress every so often else maprunner will think us hung
@ -429,7 +432,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
}
table.flushCommits();
mutator.flush();
}
}

View File

@ -38,13 +38,11 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@ -187,7 +185,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
protected void instantiateHTable() throws IOException {
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
Table table = connection.getTable(getTableName(i));
table.setAutoFlushTo(true);
//table.setWriteBufferSize(4 * 1024 * 1024);
this.tables[i] = table;
}
@ -233,9 +230,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
output.progress();
}
}
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
tables[j].flushCommits();
}
}
}
}

View File

@ -42,12 +42,11 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@ -113,8 +112,6 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
private static final int SCANNER_CACHING = 500;
protected IntegrationTestingUtility util;
private String toRun = null;
private enum Counters {
@ -168,7 +165,7 @@ public void cleanUpCluster() throws Exception {
{
protected long recordsToWrite;
protected Connection connection;
protected Table table;
protected BufferedMutator mutator;
protected Configuration conf;
protected int numBackReferencesPerRow;
@ -184,9 +181,9 @@ public void cleanUpCluster() throws Exception {
String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
this.connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf(tableName));
table.setWriteBufferSize(4*1024*1024);
table.setAutoFlushTo(false);
mutator = connection.getBufferedMutator(
new BufferedMutatorParams(TableName.valueOf(tableName))
.writeBufferSize(4 * 1024 * 1024));
String taskId = conf.get("mapreduce.task.attempt.id");
Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
@ -201,8 +198,7 @@ public void cleanUpCluster() throws Exception {
@Override
public void cleanup(Context context) throws IOException {
table.flushCommits();
table.close();
mutator.close();
connection.close();
}
@ -235,7 +231,7 @@ public void cleanUpCluster() throws Exception {
refsWritten.increment(1);
}
rowsWritten.increment(1);
table.put(p);
mutator.mutate(p);
if (i % 100 == 0) {
context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@ -244,7 +240,7 @@ public void cleanUpCluster() throws Exception {
}
// End of block, flush all of them before we start writing anything
// pointing to these!
table.flushCommits();
mutator.flush();
}
}
}
@ -320,7 +316,7 @@ public void cleanUpCluster() throws Exception {
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
job.setJarByClass(this.getClass());
setMapperClass(job);
@ -344,7 +340,7 @@ public void cleanUpCluster() throws Exception {
protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
setJobScannerConf(job);
@ -398,7 +394,7 @@ public void cleanUpCluster() throws Exception {
// Only disable and drop if we succeeded to verify - otherwise it's useful
// to leave it around for post-mortem
getTestingUtil(getConf()).deleteTable(htd.getName());
getTestingUtil(getConf()).deleteTable(htd.getTableName());
}
public void usage() {
@ -454,15 +450,17 @@ public void cleanUpCluster() throws Exception {
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
Admin admin = new HBaseAdmin(getConf());
try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) {
if (doLoad) {
admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
doLoad(getConf(), htd);
}
}
if (doVerify) {
doVerify(getConf(), htd);
if (doDelete) {
getTestingUtil(getConf()).deleteTable(htd.getName());
getTestingUtil(getConf()).deleteTable(htd.getTableName());
}
}
return 0;

View File

@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
p.setCellVisibility(new CellVisibility(exp));
getCounter(expIdx).increment(1);
table.put(p);
mutator.mutate(p);
if (i % 100 == 0) {
context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
}
// End of block, flush all of them before we start writing anything
// pointing to these!
table.flushCommits();
mutator.flush();
}
}

View File

@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -234,12 +234,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000);
Table ht = util.getConnection().getTable(this.tableName);
BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
byte[] value = new byte[300];
for (int x = 0; x < 5000; x++) {
TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
try {
ht.setAutoFlushTo(false);
for (int i = 0; i < 5; i++) {
long rk = random.nextLong();
rowKeys.add(rk);
@ -248,7 +247,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
random.nextBytes(value);
p.add(familyName, Bytes.toBytes(random.nextLong()), value);
}
ht.put(p);
ht.mutate(p);
}
if ((x % 1000) == 0) {
admin.flush(tableName);

View File

@ -227,7 +227,6 @@ public class RowResource extends ResourceBase {
}
table = servlet.getTable(tableResource.getName());
table.put(puts);
table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulPutRequests(1);
return response.build();
@ -489,7 +488,6 @@ public class RowResource extends ResourceBase {
.type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF)
.build();
}
table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulPutRequests(1);
return response.build();
@ -580,7 +578,6 @@ public class RowResource extends ResourceBase {
.type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF)
.build();
}
table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulDeleteRequests(1);
return response.build();

View File

@ -49,15 +49,16 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private int presplitRegions = 0;
private boolean useTags = false;
private int noOfTags = 1;
private HConnection connection;
private Connection connection;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
/**
@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
value.getRows(), value.getTotalRows(),
value.isFlushCommits(), value.isWriteToWAL(),
value.isUseTags(), value.getNoOfTags(),
HConnectionManager.createConnection(context.getConfiguration()), status);
ConnectionFactory.createConnection(context.getConfiguration()), status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
final int preSplitRegions = this.presplitRegions;
final boolean useTags = this.useTags;
final int numTags = this.noOfTags;
final HConnection connection = HConnectionManager.createConnection(getConf());
final Connection connection = ConnectionFactory.createConnection(getConf());
for (int i = 0; i < this.N; i++) {
final int index = i;
Thread t = new Thread ("TestClient-" + i) {
@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
Path inputDir = writeInputFile(conf);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation");
@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
private boolean writeToWAL = true;
private boolean useTags = false;
private int noOfTags = 0;
private HConnection connection;
private Connection connection;
TestOptions() {
}
TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads,
TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags,
int noOfTags, HConnection connection) {
int noOfTags, Connection connection) {
this.startRow = startRow;
this.perClientRunRows = perClientRunRows;
this.totalRows = totalRows;
@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return writeToWAL;
}
public HConnection getConnection() {
public Connection getConnection() {
return connection;
}
@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected final int totalRows;
private final Status status;
protected TableName tableName;
protected HTableInterface table;
protected volatile Configuration conf;
protected boolean flushCommits;
protected boolean writeToWAL;
protected boolean useTags;
protected int noOfTags;
protected HConnection connection;
protected Connection connection;
/**
* Note that all subclasses of this class must provide a public contructor
@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.totalRows = options.getTotalRows();
this.status = status;
this.tableName = options.getTableName();
this.table = null;
this.conf = conf;
this.flushCommits = options.isFlushCommits();
this.writeToWAL = options.isWriteToWAL();
this.useTags = options.isUseTags();
this.noOfTags = options.getNumTags();
@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return period == 0? this.perClientRunRows: period;
}
void testSetup() throws IOException {
this.table = connection.getTable(tableName);
this.table.setAutoFlushTo(false);
}
void testTakedown() throws IOException {
if (flushCommits) {
this.table.flushCommits();
}
table.close();
}
abstract void testTakedown() throws IOException;
/*
* Run test
* @return Elapsed time.
@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
return (System.nanoTime() - startTime) / 1000000;
}
abstract void testSetup() throws IOException;
/**
* Provides an extension point for tests that don't want a per row invocation.
*/
@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool {
abstract void testRow(final int i) throws IOException;
}
@SuppressWarnings("unused")
static class RandomSeekScanTest extends Test {
static abstract class TableTest extends Test {
protected Table table;
public TableTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
void testSetup() throws IOException {
this.table = connection.getTable(tableName);
}
@Override
void testTakedown() throws IOException {
table.close();
}
}
static abstract class BufferedMutatorTest extends Test {
protected BufferedMutator mutator;
protected boolean flushCommits;
public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
this.flushCommits = options.isFlushCommits();
}
void testSetup() throws IOException {
this.mutator = connection.getBufferedMutator(tableName);
}
void testTakedown() throws IOException {
if (flushCommits) {
this.mutator.flush();
}
mutator.close();
}
}
static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@SuppressWarnings("unused")
static abstract class RandomScanWithRangeTest extends Test {
static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class RandomReadTest extends Test {
static class RandomReadTest extends TableTest {
RandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomWriteTest extends Test {
static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
mutator.mutate(put);
}
}
static class ScanTest extends Test {
static class ScanTest extends TableTest {
private ResultScanner testScanner;
ScanTest(Configuration conf, TestOptions options, Status status) {
@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class SequentialReadTest extends Test {
static class SequentialReadTest extends TableTest {
SequentialReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class SequentialWriteTest extends Test {
static class SequentialWriteTest extends BufferedMutatorTest {
SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
mutator.mutate(put);
}
}
static class FilteredScanTest extends Test {
static class FilteredScanTest extends TableTest {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
FilteredScanTest(Configuration conf, TestOptions options, Status status) {
@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long runOneClient(final Class<? extends Test> cmd, final int startRow,
final int perClientRunRows, final int totalRows,
boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
HConnection connection, final Status status)
Connection connection, final Status status)
throws IOException {
status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
perClientRunRows + " rows");
@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
this.connection = HConnectionManager.createConnection(getConf());
this.connection = ConnectionFactory.createConnection(getConf());
final String useTags = "--usetags=";
if (cmd.startsWith(useTags)) {

View File

@ -19,10 +19,10 @@
package org.apache.hadoop.hbase.rest.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.ArrayList;
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -99,9 +98,7 @@ public class TestRemoteTable {
htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3));
htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3));
admin.createTable(htd);
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(TABLE);
try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
Put put = new Put(ROW_1);
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
table.put(put);
@ -110,9 +107,6 @@ public class TestRemoteTable {
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2);
put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2);
table.put(put);
table.flushCommits();
} finally {
if (null != table) table.close();
}
remoteTable = new RemoteHTable(
new Client(new Cluster().add("localhost",
@ -349,7 +343,7 @@ public class TestRemoteTable {
assertTrue(Bytes.equals(VALUE_2, value2));
Delete delete = new Delete(ROW_3);
delete.deleteColumn(COLUMN_2, QUALIFIER_2);
delete.addColumn(COLUMN_2, QUALIFIER_2);
remoteTable.delete(delete);
get = new Get(ROW_3);
@ -464,7 +458,7 @@ public class TestRemoteTable {
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
assertTrue(remoteTable.exists(get));
assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length);
assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length);
Delete delete = new Delete(ROW_1);
remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete);

View File

@ -60,8 +60,7 @@ import com.google.protobuf.ServiceException;
@InterfaceStability.Stable
public final class HTableWrapper implements HTableInterface {
private TableName tableName;
private final Table table;
private final HTableInterface table;
private ClusterConnection connection;
private final List<HTableInterface> openTables;
@ -78,7 +77,6 @@ public final class HTableWrapper implements HTableInterface {
private HTableWrapper(List<HTableInterface> openTables, TableName tableName,
ClusterConnection connection, ExecutorService pool)
throws IOException {
this.tableName = tableName;
this.table = connection.getTable(tableName, pool);
this.connection = connection;
this.openTables = openTables;
@ -244,7 +242,7 @@ public final class HTableWrapper implements HTableInterface {
@Override
public byte[] getTableName() {
return tableName.getName();
return table.getTableName();
}
@Override
@ -320,7 +318,7 @@ public final class HTableWrapper implements HTableInterface {
@Override
public void setAutoFlush(boolean autoFlush) {
table.setAutoFlushTo(autoFlush);
table.setAutoFlush(autoFlush);
}
@Override

View File

@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InvalidJobConfException;
@ -52,22 +52,22 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
* and write to an HBase table.
*/
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
private Table m_table;
private BufferedMutator m_mutator;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
* lifecycle of {@code conn}.
*/
public TableRecordWriter(final Table table) throws IOException {
this.m_table = table;
public TableRecordWriter(final BufferedMutator mutator) throws IOException {
this.m_mutator = mutator;
}
public void close(Reporter reporter) throws IOException {
this.m_table.close();
this.m_mutator.close();
}
public void write(ImmutableBytesWritable key, Put value) throws IOException {
m_table.put(new Put(value));
m_mutator.mutate(new Put(value));
}
}
@ -77,13 +77,12 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
throws IOException {
// expecting exactly one path
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
Table table = null;
BufferedMutator mutator = null;
// Connection is not closed. Dies with JVM. No possibility for cleanup.
Connection connection = ConnectionFactory.createConnection(job);
table = connection.getTable(tableName);
mutator = connection.getBufferedMutator(tableName);
// Clear write buffer on fail is true by default so no need to reset it.
table.setAutoFlushTo(false);
return new TableRecordWriter(table);
return new TableRecordWriter(mutator);
}
@Override

View File

@ -29,13 +29,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.JobContext;
@ -76,7 +76,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
RecordWriter<ImmutableBytesWritable, Mutation> {
private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
Connection connection;
Map<ImmutableBytesWritable, Table> tables;
Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
Configuration conf;
boolean useWriteAheadLogging;
@ -91,7 +91,6 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
boolean useWriteAheadLogging) throws IOException {
LOG.debug("Created new MultiTableRecordReader with WAL "
+ (useWriteAheadLogging ? "on" : "off"));
this.tables = new HashMap<ImmutableBytesWritable, Table>();
this.conf = conf;
this.useWriteAheadLogging = useWriteAheadLogging;
}
@ -99,28 +98,28 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
/**
* @param tableName
* the name of the table, as a string
* @return the named table
* @return the named mutator
* @throws IOException
* if there is a problem opening a table
*/
Table getTable(ImmutableBytesWritable tableName) throws IOException {
BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
if(this.connection == null){
this.connection = ConnectionFactory.createConnection(conf);
}
if (!tables.containsKey(tableName)) {
if (!mutatorMap.containsKey(tableName)) {
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
Table table = connection.getTable(TableName.valueOf(tableName.get()));
table.setAutoFlushTo(false);
tables.put(tableName, table);
BufferedMutator mutator =
connection.getBufferedMutator(TableName.valueOf(tableName.get()));
mutatorMap.put(tableName, mutator);
}
return tables.get(tableName);
return mutatorMap.get(tableName);
}
@Override
public void close(TaskAttemptContext context) throws IOException {
for (Table table : tables.values()) {
table.flushCommits();
for (BufferedMutator mutator : mutatorMap.values()) {
mutator.flush();
}
if(connection != null){
connection.close();
@ -139,16 +138,16 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
*/
@Override
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
Table table = getTable(tableName);
BufferedMutator mutator = getBufferedMutator(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
Put put = new Put((Put) action);
put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
: Durability.SKIP_WAL);
table.put(put);
mutator.mutate(put);
} else if (action instanceof Delete) {
Delete delete = new Delete((Delete) action);
table.delete(delete);
mutator.mutate(delete);
} else
throw new IllegalArgumentException(
"action must be either Delete or Put");

View File

@ -29,12 +29,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -85,7 +85,7 @@ implements Configurable {
extends RecordWriter<KEY, Mutation> {
private Connection connection;
private Table table;
private BufferedMutator mutator;
/**
* @throws IOException
@ -94,8 +94,7 @@ implements Configurable {
public TableRecordWriter() throws IOException {
String tableName = conf.get(OUTPUT_TABLE);
this.connection = ConnectionFactory.createConnection(conf);
this.table = connection.getTable(TableName.valueOf(tableName));
this.table.setAutoFlushTo(false);
this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
LOG.info("Created table instance for " + tableName);
}
/**
@ -103,12 +102,12 @@ implements Configurable {
*
* @param context The context.
* @throws IOException When closing the writer fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
* @see RecordWriter#close(TaskAttemptContext)
*/
@Override
public void close(TaskAttemptContext context)
throws IOException {
table.close();
mutator.close();
connection.close();
}
@ -118,14 +117,15 @@ implements Configurable {
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
* @see RecordWriter#write(Object, Object)
*/
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (value instanceof Put) table.put(new Put((Put)value));
else if (value instanceof Delete) table.delete(new Delete((Delete)value));
else throw new IOException("Pass a Delete or a Put");
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
}
@ -136,11 +136,9 @@ implements Configurable {
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
* @throws InterruptedException When the jobs is cancelled.
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public RecordWriter<KEY, Mutation> getRecordWriter(
TaskAttemptContext context)
public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableRecordWriter();
}
@ -151,7 +149,7 @@ implements Configurable {
* @param context The current context.
* @throws IOException When the check fails.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
* @see OutputFormat#checkOutputSpecs(JobContext)
*/
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
@ -167,7 +165,7 @@ implements Configurable {
* @return The committer.
* @throws IOException When creating the committer fails.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
* @see OutputFormat#getOutputCommitter(TaskAttemptContext)
*/
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)

View File

@ -49,13 +49,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -392,6 +391,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
throws IOException, InterruptedException {
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
@SuppressWarnings("unchecked")
Future<RunResult>[] threads = new Future[opts.numClientThreads];
RunResult[] results = new RunResult[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@ -457,7 +457,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
@ -940,7 +940,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private final Sampler<?> traceSampler;
private final SpanReceiverHost receiverHost;
protected Connection connection;
protected Table table;
// protected Table table;
private String testName;
private Histogram latency;
@ -1022,25 +1022,25 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (!opts.oneCon) {
this.connection = ConnectionFactory.createConnection(conf);
}
this.table = connection.getTable(TableName.valueOf(opts.tableName));
this.table.setAutoFlushTo(opts.autoFlush);
onStartup();
latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
}
abstract void onStartup() throws IOException;
void testTakedown() throws IOException {
reportLatency();
reportValueSize();
if (opts.flushCommits) {
this.table.flushCommits();
}
table.close();
onTakedown();
if (!opts.oneCon) {
connection.close();
}
receiverHost.closeReceivers();
}
abstract void onTakedown() throws IOException;
/*
* Run test
* @return Elapsed time.
@ -1136,7 +1136,43 @@ public class PerformanceEvaluation extends Configured implements Tool {
abstract void testRow(final int i) throws IOException, InterruptedException;
}
static class RandomSeekScanTest extends Test {
static abstract class TableTest extends Test {
protected Table table;
TableTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void onStartup() throws IOException {
this.table = connection.getTable(TableName.valueOf(opts.tableName));
}
@Override
void onTakedown() throws IOException {
table.close();
}
}
static abstract class BufferedMutatorTest extends Test {
protected BufferedMutator mutator;
BufferedMutatorTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void onStartup() throws IOException {
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
}
@Override
void onTakedown() throws IOException {
mutator.close();
}
}
static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@ -1166,7 +1202,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static abstract class RandomScanWithRangeTest extends Test {
static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@ -1254,7 +1290,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class RandomReadTest extends Test {
static class RandomReadTest extends TableTest {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
@ -1308,7 +1344,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class RandomWriteTest extends Test {
static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@ -1334,11 +1370,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
mutator.mutate(put);
}
}
static class ScanTest extends Test {
static class ScanTest extends TableTest {
private ResultScanner testScanner;
ScanTest(Connection con, TestOptions options, Status status) {
@ -1371,7 +1407,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class SequentialReadTest extends Test {
static class SequentialReadTest extends TableTest {
SequentialReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@ -1387,7 +1423,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class SequentialWriteTest extends Test {
static class SequentialWriteTest extends BufferedMutatorTest {
SequentialWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@ -1413,11 +1449,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
mutator.mutate(put);
}
}
static class FilteredScanTest extends Test {
static class FilteredScanTest extends TableTest {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
FilteredScanTest(Connection con, TestOptions options, Status status) {

View File

@ -24,19 +24,17 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestCase.FlushCache;
import org.apache.hadoop.hbase.HBaseTestCase.HTableIncommon;
import org.apache.hadoop.hbase.HBaseTestCase.Incommon;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -220,14 +217,16 @@ public class TestMultiVersions {
}
}
// Insert data
List<Put> puts = new ArrayList<>();
for (int i = 0; i < startKeys.length; i++) {
for (int j = 0; j < timestamp.length; j++) {
Put put = new Put(rows[i], timestamp[j]);
put.add(HConstants.CATALOG_FAMILY, null, timestamp[j],
Bytes.toBytes(timestamp[j]));
table.put(put);
puts.add(put);
}
}
table.put(puts);
// There are 5 cases we have to test. Each is described below.
for (int i = 0; i < rows.length; i++) {
for (int j = 0; j < timestamp.length; j++) {
@ -241,7 +240,6 @@ public class TestMultiVersions {
}
assertTrue(cellCount == 1);
}
table.flushCommits();
}
// Case 1: scan with LATEST_TIMESTAMP. Should get two rows

View File

@ -135,7 +135,7 @@ public class TestClientPushback {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
table.ap.submit(tablename, ops, true, new Batch.Callback<Result>() {
table.mutator.ap.submit(tablename, ops, true, new Batch.Callback<Result>() {
@Override
public void update(byte[] region, byte[] row, Result result) {
endTime.set(EnvironmentEdgeManager.currentTime());

View File

@ -100,12 +100,12 @@ public class TestCloneSnapshotFromClient {
// take an empty snapshot
admin.snapshot(emptySnapshot, tableName);
Table table = TEST_UTIL.getConnection().getTable(tableName);
try {
// enable table and insert data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)){
snapshot0Rows = TEST_UTIL.countRows(table);
}
admin.disableTable(tableName);
// take a snapshot
@ -113,8 +113,10 @@ public class TestCloneSnapshotFromClient {
// enable table and insert more data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)){
snapshot1Rows = TEST_UTIL.countRows(table);
}
admin.disableTable(tableName);
// take a snapshot of the updated table
@ -122,9 +124,6 @@ public class TestCloneSnapshotFromClient {
// re-enable table
admin.enableTable(tableName);
} finally {
table.close();
}
}
protected int getNumReplicas() {

View File

@ -689,15 +689,15 @@ public class TestFromClientSide {
public void testMaxKeyValueSize() throws Exception {
byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize");
Configuration conf = TEST_UTIL.getConfiguration();
String oldMaxSize = conf.get("hbase.client.keyvalue.maxsize");
String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY);
Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
byte[] value = new byte[4 * 1024 * 1024];
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, value);
ht.put(put);
try {
TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 2 * 1024 * 1024);
TABLE = Bytes.toBytes("testMaxKeyValueSize2");
TEST_UTIL.getConfiguration().setInt(
TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024);
// Create new table so we pick up the change in Configuration.
try (Connection connection =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
@ -709,7 +709,7 @@ public class TestFromClientSide {
}
fail("Inserting a too large KeyValue worked, should throw exception");
} catch(Exception e) {}
conf.set("hbase.client.keyvalue.maxsize", oldMaxSize);
conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize);
}
@Test
@ -3903,7 +3903,7 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"),
new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY});
table.setAutoFlushTo(false);
table.setAutoFlush(false);
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
byte[] row = Bytes.toBytes("row" + i);
@ -3934,6 +3934,7 @@ public class TestFromClientSide {
Result row : scanner)
nbRows++;
assertEquals(NB_BATCH_ROWS * 10, nbRows);
table.close();
}
@Test
@ -3944,7 +3945,6 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"),
new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY });
table.setAutoFlushTo(false);
table.setWriteBufferSize(10);
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
@ -3956,8 +3956,6 @@ public class TestFromClientSide {
}
table.put(rowsUpdate);
table.flushCommits();
Scan scan = new Scan();
scan.addFamily(CONTENTS_FAMILY);
ResultScanner scanner = table.getScanner(scan);
@ -4146,6 +4144,7 @@ public class TestFromClientSide {
HBaseAdmin ha = new HBaseAdmin(t.getConnection());
assertTrue(ha.tableExists(tableName));
assertTrue(t.get(new Get(ROW)).isEmpty());
ha.close();
}
/**
@ -4159,9 +4158,10 @@ public class TestFromClientSide {
final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect");
HTable t = createUnmangedHConnectionHTable(tableName);
Connection conn = t.getConnection();
HBaseAdmin ha = new HBaseAdmin(conn);
try (HBaseAdmin ha = new HBaseAdmin(conn)) {
assertTrue(ha.tableExists(tableName));
assertTrue(t.get(new Get(ROW)).isEmpty());
}
// stop the master
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@ -4174,10 +4174,11 @@ public class TestFromClientSide {
// test that the same unmanaged connection works with a new
// HBaseAdmin and can connect to the new master;
HBaseAdmin newAdmin = new HBaseAdmin(conn);
try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) {
assertTrue(newAdmin.tableExists(tableName));
assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1);
}
}
@Test
public void testMiscHTableStuff() throws IOException {
@ -4273,7 +4274,6 @@ public class TestFromClientSide {
new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024);
// set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow
// in Store.rowAtOrBeforeFromStoreFile
table.setAutoFlushTo(true);
String regionName = table.getRegionLocations().firstKey().getEncodedName();
HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName);
@ -4348,6 +4348,8 @@ public class TestFromClientSide {
assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null));
assertTrue(Bytes.equals(result.getRow(), forthRow));
assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four));
table.close();
}
/**

View File

@ -149,7 +149,7 @@ public class TestMultiParallel {
ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
try {
try (Table t = connection.getTable(TEST_TABLE, executor)) {
List<Row> puts = constructPutRequests(); // creates a Put for every region
List<Put> puts = constructPutRequests(); // creates a Put for every region
t.batch(puts);
HashSet<ServerName> regionservers = new HashSet<ServerName>();
try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
@ -172,7 +172,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// load test data
List<Row> puts = constructPutRequests();
List<Put> puts = constructPutRequests();
table.batch(puts);
// create a list of gets and run it
@ -262,16 +262,12 @@ public class TestMultiParallel {
// Load the data
LOG.info("get new table");
Table table = UTIL.getConnection().getTable(TEST_TABLE);
table.setAutoFlushTo(false);
table.setWriteBufferSize(10 * 1024 * 1024);
LOG.info("constructPutRequests");
List<Row> puts = constructPutRequests();
for (Row put : puts) {
table.put((Put) put);
}
List<Put> puts = constructPutRequests();
table.put(puts);
LOG.info("puts");
table.flushCommits();
final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
.size();
assert liveRScount > 0;
@ -290,11 +286,7 @@ public class TestMultiParallel {
// try putting more keys after the abort. same key/qual... just validating
// no exceptions thrown
puts = constructPutRequests();
for (Row put : puts) {
table.put((Put) put);
}
table.flushCommits();
table.put(puts);
}
LOG.info("validating loaded data");
@ -332,7 +324,7 @@ public class TestMultiParallel {
LOG.info("test=testBatchWithPut");
Table table = CONNECTION.getTable(TEST_TABLE);
// put multiple rows using a batch
List<Row> puts = constructPutRequests();
List<Put> puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@ -364,7 +356,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// Load some data
List<Row> puts = constructPutRequests();
List<Put> puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@ -372,7 +364,7 @@ public class TestMultiParallel {
List<Row> deletes = new ArrayList<Row>();
for (int i = 0; i < KEYS.length; i++) {
Delete delete = new Delete(KEYS[i]);
delete.deleteFamily(BYTES_FAMILY);
delete.addFamily(BYTES_FAMILY);
deletes.add(delete);
}
results = table.batch(deletes);
@ -393,7 +385,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// Load some data
List<Row> puts = constructPutRequests();
List<Put> puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@ -665,8 +657,8 @@ public class TestMultiParallel {
}
}
private List<Row> constructPutRequests() {
List<Row> puts = new ArrayList<Row>();
private List<Put> constructPutRequests() {
List<Put> puts = new ArrayList<>();
for (byte[] k : KEYS) {
Put put = new Put(k);
put.add(BYTES_FAMILY, QUALIFIER, VALUE);

View File

@ -111,11 +111,12 @@ public class TestRestoreSnapshotFromClient {
// take an empty snapshot
admin.snapshot(emptySnapshot, tableName);
Table table = TEST_UTIL.getConnection().getTable(tableName);
// enable table and insert data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
snapshot0Rows = TEST_UTIL.countRows(table);
}
admin.disableTable(tableName);
// take a snapshot
@ -123,9 +124,10 @@ public class TestRestoreSnapshotFromClient {
// enable table and insert more data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
snapshot1Rows = TEST_UTIL.countRows(table);
table.close();
}
}
@After
@ -184,7 +186,7 @@ public class TestRestoreSnapshotFromClient {
assertEquals(2, table.getTableDescriptor().getFamilies().size());
HTableDescriptor htd = admin.getTableDescriptor(tableName);
assertEquals(2, htd.getFamilies().size());
SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
long snapshot2Rows = snapshot1Rows + 500;
assertEquals(snapshot2Rows, TEST_UTIL.countRows(table));
assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2));

View File

@ -133,17 +133,16 @@ public class TestRpcControllerFactory {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(name);
table.setAutoFlushTo(false);
byte[] row = Bytes.toBytes("row");
Put p = new Put(row);
p.add(fam1, fam1, Bytes.toBytes("val0"));
table.put(p);
table.flushCommits();
Integer counter = 1;
counter = verifyCount(counter);
Delete d = new Delete(row);
d.deleteColumn(fam1, fam1);
d.addColumn(fam1, fam1);
table.delete(d);
counter = verifyCount(counter);

View File

@ -176,11 +176,11 @@ public class TestHTableWrapper {
private void checkAutoFlush() {
boolean initialAutoFlush = hTableInterface.isAutoFlush();
hTableInterface.setAutoFlushTo(false);
hTableInterface.setAutoFlush(false);
assertFalse(hTableInterface.isAutoFlush());
hTableInterface.setAutoFlushTo(true);
hTableInterface.setAutoFlush(true);
assertTrue(hTableInterface.isAutoFlush());
hTableInterface.setAutoFlushTo(initialAutoFlush);
hTableInterface.setAutoFlush(initialAutoFlush);
}
private void checkBufferSize() throws IOException {

View File

@ -937,7 +937,6 @@ public class TestDistributedLogSplitting {
if (key == null || key.length == 0) {
key = new byte[] { 0, 0, 0, 0, 1 };
}
ht.setAutoFlushTo(true);
Put put = new Put(key);
put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
ht.put(put);
@ -1629,11 +1628,11 @@ public class TestDistributedLogSplitting {
/**
* Load table with puts and deletes with expected values so that we can verify later
*/
private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
t.setAutoFlushTo(false);
private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
byte[] k = new byte[3];
// add puts
List<Put> puts = new ArrayList<>();
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
@ -1642,11 +1641,11 @@ public class TestDistributedLogSplitting {
k[2] = b3;
Put put = new Put(k);
put.add(f, column, k);
t.put(put);
puts.add(put);
}
}
}
t.flushCommits();
t.put(puts);
// add deletes
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
k[0] = 'a';
@ -1655,7 +1654,6 @@ public class TestDistributedLogSplitting {
Delete del = new Delete(k);
t.delete(del);
}
t.flushCommits();
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval,

View File

@ -83,11 +83,11 @@ public class TestMaster {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME);
try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) {
assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME,
TableState.State.ENABLED));
TEST_UTIL.loadTable(ht, FAMILYNAME, false);
ht.close();
}
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
m.getConnection(), TABLENAME);

View File

@ -99,9 +99,9 @@ public class TestEndToEndSplitTransaction {
TableName tableName =
TableName.valueOf("TestSplit");
byte[] familyName = Bytes.toBytes("fam");
HTable ht = TEST_UTIL.createTable(tableName, familyName);
try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) {
TEST_UTIL.loadTable(ht, familyName, false);
ht.close();
}
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
byte []firstRow = Bytes.toBytes("aaa");
byte []splitRow = Bytes.toBytes("lll");

View File

@ -202,11 +202,9 @@ public class TestFSErrorsExposed {
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
// Make a new Configuration so it makes a new connection that has the
// above configuration on it; else we use the old one w/ 10 as default.
Table table = util.getConnection().getTable(tableName);
try (Table table = util.getConnection().getTable(tableName)) {
// Load some data
util.loadTable(table, fam, false);
table.flushCommits();
util.flush();
util.countRows(table);
@ -220,6 +218,7 @@ public class TestFSErrorsExposed {
LOG.info("Got expected error", e);
assertTrue(e.getMessage().contains("Could not seek"));
}
}
// Restart data nodes so that HBase can shut down cleanly.
util.getDFSCluster().restartDataNodes();

View File

@ -78,6 +78,7 @@ public class TestRegionFavoredNodes {
@AfterClass
public static void tearDownAfterClass() throws Exception {
table.close();
if (createWithFavoredNode == null) {
return;
}

View File

@ -31,6 +31,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import java.io.IOException;
@ -109,10 +110,11 @@ public class TestRegionServerMetrics {
TEST_UTIL.createTable(tName, cfName);
TEST_UTIL.getConnection().getTable(tName).close(); //wait for the table to come up.
Connection connection = TEST_UTIL.getConnection();
connection.getTable(tName).close(); //wait for the table to come up.
// Do a first put to be sure that the connection is established, meta is there and so on.
HTable table = (HTable) TEST_UTIL.getConnection().getTable(tName);
Table table = connection.getTable(tName);
Put p = new Put(row);
p.add(cfName, qualifier, initValue);
table.put(p);
@ -141,7 +143,9 @@ public class TestRegionServerMetrics {
metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource);
metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource);
for ( HRegionInfo i:table.getRegionLocations().keySet()) {
try (RegionLocator locator = connection.getRegionLocator(tName)) {
for ( HRegionLocation location: locator.getAllRegionLocations()) {
HRegionInfo i = location.getRegionInfo();
MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
.getMetrics()
.getSource()
@ -153,7 +157,7 @@ public class TestRegionServerMetrics {
metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg);
metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg);
}
}
List<Get> gets = new ArrayList<Get>();
for (int i=0; i< 10; i++) {
gets.add(new Get(row));
@ -165,11 +169,11 @@ public class TestRegionServerMetrics {
metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource);
metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource);
table.setAutoFlushTo(false);
List<Put> puts = new ArrayList<>();
for (int i=0; i< 30; i++) {
table.put(p);
puts.add(p);
}
table.flushCommits();
table.put(puts);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource);
@ -325,14 +329,14 @@ public class TestRegionServerMetrics {
byte[] val = Bytes.toBytes("One");
HTable t = TEST_UTIL.createTable(tableName, cf);
t.setAutoFlushTo(false);
List<Put> puts = new ArrayList<>();
for (int insertCount =0; insertCount < 100; insertCount++) {
Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
p.add(cf, qualifier, val);
t.put(p);
puts.add(p);
}
t.flushCommits();
try (HTable t = TEST_UTIL.createTable(tableName, cf)) {
t.put(puts);
Scan s = new Scan();
s.setBatch(1);
@ -344,7 +348,10 @@ public class TestRegionServerMetrics {
assertNotNull(result);
assertEquals(1, result.size());
}
for ( HRegionInfo i:t.getRegionLocations().keySet()) {
}
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
for ( HRegionLocation location: locator.getAllRegionLocations()) {
HRegionInfo i = location.getRegionInfo();
MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
.getMetrics()
.getSource()
@ -357,3 +364,4 @@ public class TestRegionServerMetrics {
}
}
}
}

View File

@ -91,7 +91,6 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
@ -172,19 +171,16 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
Put put1 = new Put(Bytes.toBytes("row2"));
put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put1);
table.flushCommits();
admin.flush(tableName);
put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version1")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
admin.compact(tableName);
@ -221,7 +217,6 @@ public class TestScannerWithBulkload {
put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version0")));
table.put(put1);
table.flushCommits();
bulkload.doBulkLoad(hfilePath, (HTable) table);
latch.countDown();
} catch (TableNotFoundException e) {
@ -263,7 +258,6 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();

View File

@ -312,7 +312,7 @@ public class TestLogRolling {
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
assertTrue(table.isAutoFlush());
assertTrue(((HTable) table).isAutoFlush());
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
final FSHLog log = (FSHLog) server.getWAL(null);
@ -456,8 +456,6 @@ public class TestLogRolling {
writeData(table, 1002);
table.setAutoFlushTo(true);
long curTime = System.currentTimeMillis();
LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));
long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);

View File

@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -54,7 +53,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
*/
@Before
public void setUp() throws Exception {
htable1.setAutoFlushTo(false);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r :
@ -119,7 +117,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
Put put = new Put(row);
put.add(famName, row, row);
if (htable1 == null) {
htable1 = utility1.getConnection().getTable(tableName);
}
htable1.put(put);
Get get = new Get(row);

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -70,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
*/
@Before
public void setUp() throws Exception {
htable1.setAutoFlushTo(true);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for ( JVMClusterUtil.RegionServerThread r :

View File

@ -151,7 +151,6 @@ public class TestReplicationWithTags {
admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
htable1 = utility1.getConnection().getTable(TABLE_NAME);
htable1.setWriteBufferSize(1024);
htable2 = utility2.getConnection().getTable(TABLE_NAME);
}

View File

@ -399,9 +399,7 @@ public class TestVisibilityLabelsReplication {
}
static Table writeData(TableName tableName, String... labelExps) throws Exception {
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
int i = 1;
List<Put> puts = new ArrayList<Put>();
for (String labelExp : labelExps) {
@ -413,11 +411,6 @@ public class TestVisibilityLabelsReplication {
i++;
}
table.put(puts);
} finally {
if (table != null) {
table.flushCommits();
}
}
return table;
}
// A simple BaseRegionbserver impl that allows to add a non-visibility tag from the

View File

@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@ -673,20 +673,22 @@ public class SnapshotTestingUtils {
public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows,
byte[]... families) throws IOException, InterruptedException {
loadData(util, util.getConnection().getTable(tableName), rows, families);
BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName);
loadData(util, mutator, rows, families);
}
public static void loadData(final HBaseTestingUtility util, final Table table, int rows,
public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows,
byte[]... families) throws IOException, InterruptedException {
table.setAutoFlushTo(false);
// Ensure one row per region
assertTrue(rows >= KEYS.length);
for (byte k0: KEYS) {
byte[] k = new byte[] { k0 };
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k);
byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
putData(table, families, key, value);
final byte[][] families1 = families;
final byte[] key1 = key;
final byte[] value1 = value;
mutator.mutate(createPut(families1, key1, value1));
rows--;
}
@ -694,22 +696,24 @@ public class SnapshotTestingUtils {
while (rows-- > 0) {
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
putData(table, families, key, value);
final byte[][] families1 = families;
final byte[] key1 = key;
final byte[] value1 = value;
mutator.mutate(createPut(families1, key1, value1));
}
table.flushCommits();
mutator.flush();
waitForTableToBeOnline(util, table.getName());
waitForTableToBeOnline(util, mutator.getName());
}
private static void putData(final Table table, final byte[][] families,
final byte[] key, final byte[] value) throws IOException {
private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
byte[] q = Bytes.toBytes("q");
Put put = new Put(key);
put.setDurability(Durability.SKIP_WAL);
for (byte[] family: families) {
put.add(family, q, value);
}
table.put(put);
return put;
}
public static void deleteAllSnapshots(final Admin admin)

View File

@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -41,11 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
@ -54,7 +49,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -76,7 +70,6 @@ public class TestFlushSnapshotFromClient {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final int NUM_RS = 2;
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static final byte[] TEST_QUAL = Bytes.toBytes("q");
private static final TableName TABLE_NAME = TableName.valueOf("test");
private final int DEFAULT_NUM_ROWS = 100;
@ -145,8 +138,7 @@ public class TestFlushSnapshotFromClient {
SnapshotTestingUtils.assertNoSnapshots(admin);
// put some stuff in the table
Table table = UTIL.getConnection().getTable(TABLE_NAME);
SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
LOG.debug("FS state before snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
@ -228,8 +220,7 @@ public class TestFlushSnapshotFromClient {
SnapshotTestingUtils.assertNoSnapshots(admin);
// put some stuff in the table
Table table = UTIL.getConnection().getTable(TABLE_NAME);
SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
LOG.debug("FS state before snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@ -103,8 +102,8 @@ public class TestRestoreFlushSnapshotFromClient {
// create Table and disable it
SnapshotTestingUtils.createTable(UTIL, tableName, FAMILY);
SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
Table table = UTIL.getConnection().getTable(tableName);
SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY);
snapshot0Rows = UTIL.countRows(table);
LOG.info("=== before snapshot with 500 rows");
logFSTree();
@ -117,7 +116,7 @@ public class TestRestoreFlushSnapshotFromClient {
logFSTree();
// insert more data
SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY);
SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
snapshot1Rows = UTIL.countRows(table);
LOG.info("=== before snapshot with 1000 rows");
logFSTree();