getWriteBuffer() {
+ return this.writeAsyncBuffer;
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
new file mode 100644
index 00000000000..d4cdeadd34c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Parameters for instantiating a {@link BufferedMutator}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BufferedMutatorParams {
+
+ static final int UNSET = -1;
+
+ private final TableName tableName;
+ private long writeBufferSize = UNSET;
+ private int maxKeyValueSize = UNSET;
+ private ExecutorService pool = null;
+ private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
+ @Override
+ public void onException(RetriesExhaustedWithDetailsException exception,
+ BufferedMutator bufferedMutator)
+ throws RetriesExhaustedWithDetailsException {
+ throw exception;
+ }
+ };
+
+ public BufferedMutatorParams(TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ /**
+ * Override the write buffer size specified by the provided {@link Connection}'s
+ * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
+ * {@code hbase.client.write.buffer}.
+ */
+ public BufferedMutatorParams writeBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+
+ public int getMaxKeyValueSize() {
+ return maxKeyValueSize;
+ }
+
+ /**
+ * Override the maximum key-value size specified by the provided {@link Connection}'s
+ * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
+ * {@code hbase.client.keyvalue.maxsize}.
+ */
+ public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) {
+ this.maxKeyValueSize = maxKeyValueSize;
+ return this;
+ }
+
+ public ExecutorService getPool() {
+ return pool;
+ }
+
+ /**
+ * Override the default executor pool defined by the {@code hbase.htable.threads.*}
+ * configuration values.
+ */
+ public BufferedMutatorParams pool(ExecutorService pool) {
+ this.pool = pool;
+ return this;
+ }
+
+ public BufferedMutator.ExceptionListener getListener() {
+ return listener;
+ }
+
+ /**
+ * Override the default error handler. Default handler simply rethrows the exception.
+ */
+ public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) {
+ this.listener = listener;
+ return this;
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index 55237be5ba5..72f870fb4b3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -98,6 +98,37 @@ public interface Connection extends Abortable, Closeable {
*/
Table getTable(TableName tableName, ExecutorService pool) throws IOException;
+ /**
+ *
+ * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
+ * {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will
+ * use the Connection's ExecutorService. This object can be used for long lived operations.
+ *
+ *
+ * The caller is responsible for calling {@link BufferedMutator#close()} on
+ * the returned {@link BufferedMutator} instance.
+ *
+ *
+ * This accessor will use the connection's ExecutorService and will throw an
+ * exception in the main thread when an asynchronous exception occurs.
+ *
+ * @param tableName the name of the table
+ *
+ * @return a {@link BufferedMutator} for the supplied tableName.
+ */
+ public BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
+
+ /**
+ * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
+ * {@link BufferedMutator} returned by this method is thread-safe. This object can be used for
+ * long lived table operations. The caller is responsible for calling
+ * {@link BufferedMutator#close()} on the returned {@link BufferedMutator} instance.
+ *
+ * @param params details on how to instantiate the {@code BufferedMutator}.
+ * @return a {@link BufferedMutator} for the supplied tableName.
+ */
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
+
/**
* Retrieve a RegionLocator implementation to inspect region information on a table. The returned
* RegionLocator is not thread-safe, so a new instance should be created for each using thread.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index 53c1271c265..e1458b83d33 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -109,6 +109,17 @@ abstract class ConnectionAdapter implements ClusterConnection {
return wrappedConnection.getTable(tableName, pool);
}
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params)
+ throws IOException {
+ return wrappedConnection.getBufferedMutator(params);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return wrappedConnection.getBufferedMutator(tableName);
+ }
+
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return wrappedConnection.getRegionLocator(tableName);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 166bcdd1b49..358ef3e127a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -728,6 +728,28 @@ final class ConnectionManager {
return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
}
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
+ if (params.getTableName() == null) {
+ throw new IllegalArgumentException("TableName cannot be null.");
+ }
+ if (params.getPool() == null) {
+ params.pool(HTable.getDefaultExecutor(getConfiguration()));
+ }
+ if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
+ params.writeBufferSize(tableConfig.getWriteBufferSize());
+ }
+ if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
+ params.maxKeyValueSize(tableConfig.getMaxKeyValueSize());
+ }
+ return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) {
+ return getBufferedMutator(new BufferedMutatorParams(tableName));
+ }
+
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return new HRegionLocator(tableName, this);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 2c405d78424..374c06d1bc7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -112,10 +111,8 @@ public class HTable implements HTableInterface {
private final TableName tableName;
private volatile Configuration configuration;
private TableConfiguration tableConfiguration;
- protected List writeAsyncBuffer = new LinkedList();
- private long writeBufferSize;
+ protected BufferedMutatorImpl mutator;
private boolean autoFlush = true;
- protected long currentWriteBufferSize = 0 ;
private boolean closed = false;
protected int scannerCaching;
private ExecutorService pool; // For Multi & Scan
@@ -125,8 +122,6 @@ public class HTable implements HTableInterface {
private Consistency defaultConsistency = Consistency.STRONG;
private HRegionLocator locator;
- /** The Async process for puts with autoflush set to false or multiputs */
- protected AsyncProcess ap;
/** The Async process for batch */
protected AsyncProcess multiAp;
private RpcRetryingCallerFactory rpcCallerFactory;
@@ -219,7 +214,7 @@ public class HTable implements HTableInterface {
// it also scales when new region servers are added.
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue(), Threads.newDaemonThreadFactory("htable"));
- ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ pool.allowCoreThreadTimeOut(true);
return pool;
}
@@ -323,15 +318,18 @@ public class HTable implements HTableInterface {
}
/**
- * For internal testing.
+ * For internal testing. Uses Connection provided in {@code params}.
* @throws IOException
*/
@VisibleForTesting
- protected HTable() throws IOException {
- tableName = null;
- tableConfiguration = new TableConfiguration();
+ protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
+ connection = conn;
+ tableName = params.getTableName();
+ tableConfiguration = new TableConfiguration(connection.getConfiguration());
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
+ // used from tests, don't trust the connection is real
+ this.mutator = new BufferedMutatorImpl(conn, null, null, params);
}
/**
@@ -351,7 +349,6 @@ public class HTable implements HTableInterface {
this.operationTimeout = tableName.isSystemTable() ?
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
- this.writeBufferSize = tableConfiguration.getWriteBufferSize();
this.scannerCaching = tableConfiguration.getScannerCaching();
if (this.rpcCallerFactory == null) {
@@ -362,8 +359,6 @@ public class HTable implements HTableInterface {
}
// puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true,
- rpcControllerFactory);
multiAp = this.connection.getAsyncProcess();
this.locator = new HRegionLocator(getName(), connection);
}
@@ -539,7 +534,7 @@ public class HTable implements HTableInterface {
*/
@Deprecated
public List getWriteBuffer() {
- return writeAsyncBuffer;
+ return mutator == null ? null : mutator.getWriteBuffer();
}
/**
@@ -643,7 +638,7 @@ public class HTable implements HTableInterface {
* This is mainly useful for the MapReduce integration.
* @return A map of HRegionInfo with it's server address
* @throws IOException if a remote or network exception occurs
- *
+ *
* @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
*/
@Deprecated
@@ -1000,11 +995,11 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
+ * @throws IOException
*/
@Override
- public void put(final Put put)
- throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- doPut(put);
+ public void put(final Put put) throws IOException {
+ getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
@@ -1012,82 +1007,16 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
+ * @throws IOException
*/
@Override
- public void put(final List puts)
- throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- for (Put put : puts) {
- doPut(put);
- }
+ public void put(final List puts) throws IOException {
+ getBufferedMutator().mutate(puts);
if (autoFlush) {
flushCommits();
}
}
-
- /**
- * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
- * cluster.
- * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
- * @throws InterruptedIOException if we were interrupted.
- */
- private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- // This behavior is highly non-intuitive... it does not protect us against
- // 94-incompatible behavior, which is a timing issue because hasError, the below code
- // and setter of hasError are not synchronized. Perhaps it should be removed.
- if (ap.hasError()) {
- writeAsyncBuffer.add(put);
- backgroundFlushCommits(true);
- }
-
- validatePut(put);
-
- currentWriteBufferSize += put.heapSize();
- writeAsyncBuffer.add(put);
-
- while (currentWriteBufferSize > writeBufferSize) {
- backgroundFlushCommits(false);
- }
- }
-
-
- /**
- * Send the operations in the buffer to the servers. Does not wait for the server's answer.
- * If the is an error (max retried reach from a previous flush or bad operation), it tries to
- * send all operations in the buffer and sends an exception.
- * @param synchronous - if true, sends all the writes and wait for all of them to finish before
- * returning.
- */
- private void backgroundFlushCommits(boolean synchronous) throws
- InterruptedIOException, RetriesExhaustedWithDetailsException {
-
- try {
- if (!synchronous) {
- ap.submit(tableName, writeAsyncBuffer, true, null, false);
- if (ap.hasError()) {
- LOG.debug(tableName + ": One or more of the operations have failed -" +
- " waiting for all operation in progress to finish (successfully or not)");
- }
- }
- if (synchronous || ap.hasError()) {
- while (!writeAsyncBuffer.isEmpty()) {
- ap.submit(tableName, writeAsyncBuffer, true, null, false);
- }
- RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
- if (error != null) {
- throw error;
- }
- }
- } finally {
- currentWriteBufferSize = 0;
- for (Row mut : writeAsyncBuffer) {
- if (mut instanceof Mutation) {
- currentWriteBufferSize += ((Mutation) mut).heapSize();
- }
- }
- }
- }
-
/**
* {@inheritDoc}
*/
@@ -1264,7 +1193,7 @@ public class HTable implements HTableInterface {
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, put);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
@@ -1452,12 +1381,11 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
+ * @throws IOException
*/
@Override
- public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- // As we can have an operation in progress even if the buffer is empty, we call
- // backgroundFlushCommits at least one time.
- backgroundFlushCommits(true);
+ public void flushCommits() throws IOException {
+ getBufferedMutator().flush();
}
/**
@@ -1581,7 +1509,11 @@ public class HTable implements HTableInterface {
*/
@Override
public long getWriteBufferSize() {
- return writeBufferSize;
+ if (mutator == null) {
+ return tableConfiguration.getWriteBufferSize();
+ } else {
+ return mutator.getWriteBufferSize();
+ }
}
/**
@@ -1594,10 +1526,8 @@ public class HTable implements HTableInterface {
*/
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
- this.writeBufferSize = writeBufferSize;
- if(currentWriteBufferSize > writeBufferSize) {
- flushCommits();
- }
+ getBufferedMutator();
+ mutator.setWriteBufferSize(writeBufferSize);
}
/**
@@ -1914,4 +1844,17 @@ public class HTable implements HTableInterface {
public RegionLocator getRegionLocator() {
return this.locator;
}
+
+ @VisibleForTesting
+ BufferedMutator getBufferedMutator() throws IOException {
+ if (mutator == null) {
+ this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
+ new BufferedMutatorParams(tableName)
+ .pool(pool)
+ .writeBufferSize(tableConfiguration.getWriteBufferSize())
+ .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
+ );
+ }
+ return mutator;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
index 911e034ef0b..1f4d99afd04 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
@@ -67,8 +67,8 @@ public interface HTableInterface extends Table {
* Whether or not to enable 'auto-flush'.
* @deprecated in 0.96. When called with setAutoFlush(false), this function also
* set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
- * Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
- * {@link #setAutoFlushTo(boolean)} for all other cases.
+ * Replace it with setAutoFlush(false, false) if this is exactly what you want, though
+ * this is the method you want for most cases.
*/
@Deprecated
void setAutoFlush(boolean autoFlush);
@@ -105,12 +105,68 @@ public interface HTableInterface extends Table {
* the value of this parameter is ignored and clearBufferOnFail is set to true.
* Setting clearBufferOnFail to false is deprecated since 0.96.
* @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
- * {@link #setAutoFlushTo(boolean)}} instead.
- * @see #flushCommits
+ * {@link #setAutoFlush(boolean)}} instead.
+ * @see BufferedMutator#flush()
*/
@Deprecated
void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail);
+ /**
+ * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}.
+ * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
+ * {@link #setAutoFlush(boolean)} instead, or better still, move on to {@link BufferedMutator}
+ */
+ @Deprecated
+ void setAutoFlushTo(boolean autoFlush);
+
+ /**
+ * Tells whether or not 'auto-flush' is turned on.
+ *
+ * @return {@code true} if 'auto-flush' is enabled (default), meaning
+ * {@link Put} operations don't get buffered/delayed and are immediately
+ * executed.
+ * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator}
+ */
+ @Deprecated
+ boolean isAutoFlush();
+
+ /**
+ * Executes all the buffered {@link Put} operations.
+ *
+ * This method gets called once automatically for every {@link Put} or batch
+ * of {@link Put}s (when put(List)
is used) when
+ * {@link #isAutoFlush} is {@code true}.
+ * @throws IOException if a remote or network exception occurs.
+ * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#flush()}
+ */
+ @Deprecated
+ void flushCommits() throws IOException;
+
+ /**
+ * Returns the maximum size in bytes of the write buffer for this HTable.
+ *
+ * The default value comes from the configuration parameter
+ * {@code hbase.client.write.buffer}.
+ * @return The size of the write buffer in bytes.
+ * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#getWriteBufferSize()}
+ */
+ @Deprecated
+ long getWriteBufferSize();
+
+ /**
+ * Sets the size of the buffer in bytes.
+ *
+ * If the new size is less than the current amount of data in the
+ * write buffer, the buffer gets flushed.
+ * @param writeBufferSize The new write buffer size, in bytes.
+ * @throws IOException if a remote or network exception occurs.
+ * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} and
+ * {@link BufferedMutatorParams#writeBufferSize(long)}
+ */
+ @Deprecated
+ void setWriteBufferSize(long writeBufferSize) throws IOException;
+
+
/**
* Return the row that matches row exactly,
* or the one that immediately precedes it.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index a408b1d35df..55fb1c49d0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -219,9 +219,7 @@ public interface Table extends Closeable {
/**
* Puts some data in the table.
- *
- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
- * until the internal buffer is full.
+ *
* @param put The data to put.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
@@ -231,9 +229,6 @@ public interface Table extends Closeable {
/**
* Puts some data in the table, in batch.
*
- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
- * until the internal buffer is full.
- *
* This can be used for group commit, or for submitting user defined
* batches. The writeBuffer will be periodically inspected while the List
* is processed, so depending on the List size the writeBuffer may flush
@@ -497,30 +492,6 @@ public interface Table extends Closeable {
byte[] startKey, byte[] endKey, final Batch.Call callable,
final Batch.Callback callback) throws ServiceException, Throwable;
- /**
- * Tells whether or not 'auto-flush' is turned on.
- *
- * @return {@code true} if 'auto-flush' is enabled (default), meaning
- * {@link Put} operations don't get buffered/delayed and are immediately
- * executed.
- */
- boolean isAutoFlush();
-
- /**
- * Executes all the buffered {@link Put} operations.
- *
- * This method gets called once automatically for every {@link Put} or batch
- * of {@link Put}s (when put(List)
is used) when
- * {@link #isAutoFlush} is {@code true}.
- * @throws IOException if a remote or network exception occurs.
- */
- void flushCommits() throws IOException;
-
- /**
- * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}
- */
- void setAutoFlushTo(boolean autoFlush);
-
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
*
@@ -540,7 +511,6 @@ public interface Table extends Closeable {
*/
void setWriteBufferSize(long writeBufferSize) throws IOException;
-
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
index 6176a0c1e77..70ad179bfaf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
@@ -28,20 +28,18 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class TableConfiguration {
+ public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
+ public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
+ public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
+ public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
+
private final long writeBufferSize;
-
private final int metaOperationTimeout;
-
private final int operationTimeout;
-
private final int scannerCaching;
-
private final int primaryCallTimeoutMicroSecond;
-
private final int replicaCallTimeoutMicroSecondScan;
-
private final int retries;
-
private final int maxKeyValueSize;
/**
@@ -49,7 +47,7 @@ public class TableConfiguration {
* @param conf Configuration object
*/
TableConfiguration(Configuration conf) {
- this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
+ this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.metaOperationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
@@ -70,7 +68,7 @@ public class TableConfiguration {
this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
+ this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
}
/**
@@ -80,14 +78,14 @@ public class TableConfiguration {
*/
@VisibleForTesting
protected TableConfiguration() {
- this.writeBufferSize = 2097152;
+ this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
this.primaryCallTimeoutMicroSecond = 10000;
this.replicaCallTimeoutMicroSecondScan = 1000000;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
- this.maxKeyValueSize = -1;
+ this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
}
public long getWriteBufferSize() {
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 88a95fbfd50..aa41939ce3c 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -155,8 +155,8 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
- public MyAsyncProcess(
- ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
+ public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
+ @SuppressWarnings("unused") boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) {
@Override
@@ -644,26 +644,27 @@ public class TestAsyncProcess {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
+ Mockito.when(hc.getConfiguration()).thenReturn(conf);
return hc;
}
@Test
public void testHTablePutSuccess() throws Exception {
- HTable ht = Mockito.mock(HTable.class);
+ BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
Put put = createPut(1, true);
Assert.assertEquals(0, ht.getWriteBufferSize());
- ht.put(put);
+ ht.mutate(put);
Assert.assertEquals(0, ht.getWriteBufferSize());
}
private void doHTableFailedPut(boolean bufferOn) throws Exception {
- HTable ht = new HTable();
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
- ht.ap = ap;
- ht.setAutoFlushTo(true);
+ ClusterConnection conn = createHConnection();
+ HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
+ MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+ ht.mutator.ap = ap;
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
} else {
@@ -672,7 +673,7 @@ public class TestAsyncProcess {
Put put = createPut(1, false);
- Assert.assertEquals(0L, ht.currentWriteBufferSize);
+ Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
try {
ht.put(put);
if (bufferOn) {
@@ -681,7 +682,7 @@ public class TestAsyncProcess {
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals(0L, ht.currentWriteBufferSize);
+ Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -708,14 +709,14 @@ public class TestAsyncProcess {
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
- HTable ht = new HTable();
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
- ht.ap = ap;
- ht.setAutoFlushTo(false);
- ht.setWriteBufferSize(0);
+ ClusterConnection conn = createHConnection();
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
+ new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
+ MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+ mutator.ap = ap;
Put p = createPut(1, false);
- ht.put(p);
+ mutator.mutate(p);
ap.waitUntilDone(); // Let's do all the retries.
@@ -725,13 +726,13 @@ public class TestAsyncProcess {
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
- Assert.assertEquals(0, ht.writeAsyncBuffer.size());
+ Assert.assertEquals(0, mutator.getWriteBuffer().size());
try {
- ht.put(p);
+ mutator.mutate(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
+ Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
}
@@ -762,9 +763,9 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
- HTable ht = new HTable();
- ht.connection = new MyConnectionImpl(conf);
- ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
+ ClusterConnection conn = new MyConnectionImpl(conf);
+ HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
+ ht.multiAp = new MyAsyncProcess(conn, conf, false);
List puts = new ArrayList();
puts.add(createPut(1, true));
@@ -793,26 +794,24 @@ public class TestAsyncProcess {
@Test
public void testErrorsServers() throws IOException {
- HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
+ ClusterConnection conn = new MyConnectionImpl(configuration);
+ BufferedMutatorImpl mutator =
+ new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
- // set default writeBufferSize
- ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
- ht.connection = new MyConnectionImpl(configuration);
- MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
- ht.ap = ap;
+ MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
+ mutator.ap = ap;
- Assert.assertNotNull(ht.ap.createServerErrorTracker());
- Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
- ht.ap.serverTrackerTimeout = 1;
+ Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
+ mutator.ap.serverTrackerTimeout = 1;
Put p = createPut(1, false);
- ht.setAutoFlushTo(false);
- ht.put(p);
+ mutator.mutate(p);
try {
- ht.flushCommits();
+ mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
@@ -822,19 +821,18 @@ public class TestAsyncProcess {
@Test
public void testGlobalErrors() throws IOException {
- HTable ht = new HTable();
- ht.connection = new MyConnectionImpl(conf);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
- ht.ap = ap;
+ ClusterConnection conn = new MyConnectionImpl(conf);
+ BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
+ mutator.ap = ap;
- Assert.assertNotNull(ht.ap.createServerErrorTracker());
+ Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
- ht.setAutoFlushTo(false);
- ht.put(p);
+ mutator.mutate(p);
try {
- ht.flushCommits();
+ mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
@@ -861,13 +859,12 @@ public class TestAsyncProcess {
gets.add(get);
}
- HTable ht = new HTable();
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
- ht.connection = con;
+ HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
ht.multiAp = ap;
- ht.batch(gets);
+ ht.batch(gets, new Object[gets.size()]);
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
index 43e0b756a73..d155fd7a0af 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
@@ -711,36 +711,47 @@ public class TestClientNoCluster extends Configured implements Tool {
* @throws IOException
*/
static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
- Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE));
- table.setAutoFlushTo(false);
long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
long startTime = System.currentTimeMillis();
final int printInterval = 100000;
Random rd = new Random(id);
boolean get = c.getBoolean("hbase.test.do.gets", false);
- try {
- Stopwatch stopWatch = new Stopwatch();
- stopWatch.start();
- for (int i = 0; i < namespaceSpan; i++) {
- byte [] b = format(rd.nextLong());
- if (get){
+ TableName tableName = TableName.valueOf(BIG_USER_TABLE);
+ if (get) {
+ try (Table table = sharedConnection.getTable(tableName)){
+ Stopwatch stopWatch = new Stopwatch();
+ stopWatch.start();
+ for (int i = 0; i < namespaceSpan; i++) {
+ byte [] b = format(rd.nextLong());
Get g = new Get(b);
table.get(g);
- } else {
+ if (i % printInterval == 0) {
+ LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
+ stopWatch.reset();
+ stopWatch.start();
+ }
+ }
+ LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+ (System.currentTimeMillis() - startTime) + "ms");
+ }
+ } else {
+ try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
+ Stopwatch stopWatch = new Stopwatch();
+ stopWatch.start();
+ for (int i = 0; i < namespaceSpan; i++) {
+ byte [] b = format(rd.nextLong());
Put p = new Put(b);
p.add(HConstants.CATALOG_FAMILY, b, b);
- table.put(p);
+ mutator.mutate(p);
+ if (i % printInterval == 0) {
+ LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
+ stopWatch.reset();
+ stopWatch.start();
+ }
}
- if (i % printInterval == 0) {
- LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
- stopWatch.reset();
- stopWatch.start();
+ LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+ (System.currentTimeMillis() - startTime) + "ms");
}
- }
- LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
- (System.currentTimeMillis() - startTime) + "ms");
- } finally {
- table.close();
}
}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java
new file mode 100644
index 00000000000..ab96741de51
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.example;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An example of using the {@link BufferedMutator} interface.
+ */
+public class BufferedMutatorExample extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(BufferedMutatorExample.class);
+
+ private static final int POOL_SIZE = 10;
+ private static final int TASK_COUNT = 100;
+ private static final TableName TABLE = TableName.valueOf("foo");
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+
+ @Override
+ public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
+
+ /** a callback invoked when an asynchronous write fails. */
+ final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
+ @Override
+ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
+ for (int i = 0; i < e.getNumExceptions(); i++) {
+ LOG.info("Failed to sent put " + e.getRow(i) + ".");
+ }
+ }
+ };
+ BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
+ .listener(listener);
+
+ //
+ // step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
+ //
+ try (final Connection conn = ConnectionFactory.createConnection(getConf());
+ final BufferedMutator mutator = conn.getBufferedMutator(params)) {
+
+ /** worker pool that operates on BufferedTable instances */
+ final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE);
+ List> futures = new ArrayList<>(TASK_COUNT);
+
+ for (int i = 0; i < TASK_COUNT; i++) {
+ futures.add(workerPool.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ //
+ // step 2: each worker sends edits to the shared BufferedMutator instance. They all use
+ // the same backing buffer, call-back "listener", and RPC executor pool.
+ //
+ Put p = new Put(Bytes.toBytes("someRow"));
+ p.add(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
+ mutator.mutate(p);
+ // do work... maybe you want to call mutator.flush() after many edits to ensure any of
+ // this worker's edits are sent before exiting the Callable
+ return null;
+ }
+ }));
+ }
+
+ //
+ // step 3: clean up the worker pool, shut down.
+ //
+ for (Future f : futures) {
+ f.get(5, TimeUnit.MINUTES);
+ }
+ workerPool.shutdown();
+ } catch (IOException e) {
+ // exception while creating/destroying Connection or BufferedMutator
+ LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
+ } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is
+ // invoked from here.
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new BufferedMutatorExample(), args);
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 931fba42b23..29bb2bbc4fa 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -18,7 +18,18 @@
package org.apache.hadoop.hbase.test;
-import com.google.common.collect.Sets;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -40,6 +51,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -87,17 +100,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Sets;
/**
* This is an integration test borrowed from goraci, written by Keith Turner,
@@ -340,7 +343,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
byte[] id;
long count = 0;
int i;
- Table table;
+ BufferedMutator mutator;
Connection connection;
long numNodes;
long wrap;
@@ -363,14 +366,14 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
protected void instantiateHTable() throws IOException {
- table = connection.getTable(getTableName(connection.getConfiguration()));
- table.setAutoFlushTo(false);
- table.setWriteBufferSize(4 * 1024 * 1024);
+ mutator = connection.getBufferedMutator(
+ new BufferedMutatorParams(getTableName(connection.getConfiguration()))
+ .writeBufferSize(4 * 1024 * 1024));
}
@Override
protected void cleanup(Context context) throws IOException ,InterruptedException {
- table.close();
+ mutator.close();
connection.close();
}
@@ -421,7 +424,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (id != null) {
put.add(FAMILY_NAME, COLUMN_CLIENT, id);
}
- table.put(put);
+ mutator.mutate(put);
if (i % 1000 == 0) {
// Tickle progress every so often else maprunner will think us hung
@@ -429,7 +432,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
}
- table.flushCommits();
+ mutator.flush();
}
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
index 50c638a496f..e136ac8b48a 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
@@ -38,13 +38,11 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -187,7 +185,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
protected void instantiateHTable() throws IOException {
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
Table table = connection.getTable(getTableName(i));
- table.setAutoFlushTo(true);
//table.setWriteBufferSize(4 * 1024 * 1024);
this.tables[i] = table;
}
@@ -233,9 +230,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
output.progress();
}
}
- for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
- tables[j].flushCommits();
- }
}
}
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
index 6e10ba9e11f..c92393ff07b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
@@ -42,12 +42,11 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -113,8 +112,6 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
private static final int SCANNER_CACHING = 500;
- protected IntegrationTestingUtility util;
-
private String toRun = null;
private enum Counters {
@@ -168,7 +165,7 @@ public void cleanUpCluster() throws Exception {
{
protected long recordsToWrite;
protected Connection connection;
- protected Table table;
+ protected BufferedMutator mutator;
protected Configuration conf;
protected int numBackReferencesPerRow;
@@ -184,9 +181,9 @@ public void cleanUpCluster() throws Exception {
String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
this.connection = ConnectionFactory.createConnection(conf);
- table = connection.getTable(TableName.valueOf(tableName));
- table.setWriteBufferSize(4*1024*1024);
- table.setAutoFlushTo(false);
+ mutator = connection.getBufferedMutator(
+ new BufferedMutatorParams(TableName.valueOf(tableName))
+ .writeBufferSize(4 * 1024 * 1024));
String taskId = conf.get("mapreduce.task.attempt.id");
Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
@@ -201,8 +198,7 @@ public void cleanUpCluster() throws Exception {
@Override
public void cleanup(Context context) throws IOException {
- table.flushCommits();
- table.close();
+ mutator.close();
connection.close();
}
@@ -235,7 +231,7 @@ public void cleanUpCluster() throws Exception {
refsWritten.increment(1);
}
rowsWritten.increment(1);
- table.put(p);
+ mutator.mutate(p);
if (i % 100 == 0) {
context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@@ -244,7 +240,7 @@ public void cleanUpCluster() throws Exception {
}
// End of block, flush all of them before we start writing anything
// pointing to these!
- table.flushCommits();
+ mutator.flush();
}
}
}
@@ -320,7 +316,7 @@ public void cleanUpCluster() throws Exception {
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
job.setJarByClass(this.getClass());
setMapperClass(job);
@@ -344,7 +340,7 @@ public void cleanUpCluster() throws Exception {
protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
setJobScannerConf(job);
@@ -398,7 +394,7 @@ public void cleanUpCluster() throws Exception {
// Only disable and drop if we succeeded to verify - otherwise it's useful
// to leave it around for post-mortem
- getTestingUtil(getConf()).deleteTable(htd.getName());
+ getTestingUtil(getConf()).deleteTable(htd.getTableName());
}
public void usage() {
@@ -454,15 +450,17 @@ public void cleanUpCluster() throws Exception {
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
- Admin admin = new HBaseAdmin(getConf());
- if (doLoad) {
- admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
- doLoad(getConf(), htd);
+ try (Connection conn = ConnectionFactory.createConnection(getConf());
+ Admin admin = conn.getAdmin()) {
+ if (doLoad) {
+ admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
+ doLoad(getConf(), htd);
+ }
}
if (doVerify) {
doVerify(getConf(), htd);
if (doDelete) {
- getTestingUtil(getConf()).deleteTable(htd.getName());
+ getTestingUtil(getConf()).deleteTable(htd.getTableName());
}
}
return 0;
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
index 96743c8bc64..05e214b17f9 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
@@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
p.setCellVisibility(new CellVisibility(exp));
getCounter(expIdx).increment(1);
- table.put(p);
+ mutator.mutate(p);
if (i % 100 == 0) {
context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
}
// End of block, flush all of them before we start writing anything
// pointing to these!
- table.flushCommits();
+ mutator.flush();
}
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
index 2b0b1d60056..3fa8a9cba0d 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -234,12 +234,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
private LinkedBlockingQueue insertData() throws IOException, InterruptedException {
LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000);
- Table ht = util.getConnection().getTable(this.tableName);
+ BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
byte[] value = new byte[300];
for (int x = 0; x < 5000; x++) {
TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
try {
- ht.setAutoFlushTo(false);
for (int i = 0; i < 5; i++) {
long rk = random.nextLong();
rowKeys.add(rk);
@@ -248,7 +247,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
random.nextBytes(value);
p.add(familyName, Bytes.toBytes(random.nextLong()), value);
}
- ht.put(p);
+ ht.mutate(p);
}
if ((x % 1000) == 0) {
admin.flush(tableName);
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
index 583644200b7..dad5a32d3e3 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
@@ -227,7 +227,6 @@ public class RowResource extends ResourceBase {
}
table = servlet.getTable(tableResource.getName());
table.put(puts);
- table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulPutRequests(1);
return response.build();
@@ -489,7 +488,6 @@ public class RowResource extends ResourceBase {
.type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF)
.build();
}
- table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulPutRequests(1);
return response.build();
@@ -580,7 +578,6 @@ public class RowResource extends ResourceBase {
.type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF)
.build();
}
- table.flushCommits();
ResponseBuilder response = Response.ok();
servlet.getMetrics().incrementSucessfulDeleteRequests(1);
return response.build();
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
index b02f069561e..e91f873bf17 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
@@ -49,15 +49,16 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
@@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private int presplitRegions = 0;
private boolean useTags = false;
private int noOfTags = 1;
- private HConnection connection;
+ private Connection connection;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
/**
@@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
value.getRows(), value.getTotalRows(),
value.isFlushCommits(), value.isWriteToWAL(),
value.isUseTags(), value.getNoOfTags(),
- HConnectionManager.createConnection(context.getConfiguration()), status);
+ ConnectionFactory.createConnection(context.getConfiguration()), status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
@@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
final int preSplitRegions = this.presplitRegions;
final boolean useTags = this.useTags;
final int numTags = this.noOfTags;
- final HConnection connection = HConnectionManager.createConnection(getConf());
+ final Connection connection = ConnectionFactory.createConnection(getConf());
for (int i = 0; i < this.N; i++) {
final int index = i;
Thread t = new Thread ("TestClient-" + i) {
@@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
Path inputDir = writeInputFile(conf);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation");
@@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
private boolean writeToWAL = true;
private boolean useTags = false;
private int noOfTags = 0;
- private HConnection connection;
+ private Connection connection;
TestOptions() {
}
TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads,
TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags,
- int noOfTags, HConnection connection) {
+ int noOfTags, Connection connection) {
this.startRow = startRow;
this.perClientRunRows = perClientRunRows;
this.totalRows = totalRows;
@@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return writeToWAL;
}
- public HConnection getConnection() {
+ public Connection getConnection() {
return connection;
}
@@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected final int totalRows;
private final Status status;
protected TableName tableName;
- protected HTableInterface table;
protected volatile Configuration conf;
- protected boolean flushCommits;
protected boolean writeToWAL;
protected boolean useTags;
protected int noOfTags;
- protected HConnection connection;
+ protected Connection connection;
/**
* Note that all subclasses of this class must provide a public contructor
@@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.totalRows = options.getTotalRows();
this.status = status;
this.tableName = options.getTableName();
- this.table = null;
this.conf = conf;
- this.flushCommits = options.isFlushCommits();
this.writeToWAL = options.isWriteToWAL();
this.useTags = options.isUseTags();
this.noOfTags = options.getNumTags();
@@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return period == 0? this.perClientRunRows: period;
}
- void testSetup() throws IOException {
- this.table = connection.getTable(tableName);
- this.table.setAutoFlushTo(false);
- }
-
- void testTakedown() throws IOException {
- if (flushCommits) {
- this.table.flushCommits();
- }
- table.close();
- }
-
+ abstract void testTakedown() throws IOException;
/*
* Run test
* @return Elapsed time.
@@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
return (System.nanoTime() - startTime) / 1000000;
}
+ abstract void testSetup() throws IOException;
+
/**
* Provides an extension point for tests that don't want a per row invocation.
*/
@@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool {
abstract void testRow(final int i) throws IOException;
}
- @SuppressWarnings("unused")
- static class RandomSeekScanTest extends Test {
+ static abstract class TableTest extends Test {
+ protected Table table;
+
+ public TableTest(Configuration conf, TestOptions options, Status status) {
+ super(conf, options, status);
+ }
+
+ void testSetup() throws IOException {
+ this.table = connection.getTable(tableName);
+ }
+
+ @Override
+ void testTakedown() throws IOException {
+ table.close();
+ }
+ }
+
+ static abstract class BufferedMutatorTest extends Test {
+ protected BufferedMutator mutator;
+ protected boolean flushCommits;
+
+ public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) {
+ super(conf, options, status);
+ this.flushCommits = options.isFlushCommits();
+ }
+
+ void testSetup() throws IOException {
+ this.mutator = connection.getBufferedMutator(tableName);
+ }
+
+ void testTakedown() throws IOException {
+ if (flushCommits) {
+ this.mutator.flush();
+ }
+ mutator.close();
+ }
+ }
+
+ static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@SuppressWarnings("unused")
- static abstract class RandomScanWithRangeTest extends Test {
+ static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
- static class RandomReadTest extends Test {
+ static class RandomReadTest extends TableTest {
RandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- static class RandomWriteTest extends Test {
+ static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
- table.put(put);
+ mutator.mutate(put);
}
}
- static class ScanTest extends Test {
+ static class ScanTest extends TableTest {
private ResultScanner testScanner;
ScanTest(Configuration conf, TestOptions options, Status status) {
@@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- static class SequentialReadTest extends Test {
+ static class SequentialReadTest extends TableTest {
SequentialReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- static class SequentialWriteTest extends Test {
+ static class SequentialWriteTest extends BufferedMutatorTest {
SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
- table.put(put);
+ mutator.mutate(put);
}
}
- static class FilteredScanTest extends Test {
+ static class FilteredScanTest extends TableTest {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
FilteredScanTest(Configuration conf, TestOptions options, Status status) {
@@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long runOneClient(final Class extends Test> cmd, final int startRow,
final int perClientRunRows, final int totalRows,
boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
- HConnection connection, final Status status)
+ Connection connection, final Status status)
throws IOException {
status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
perClientRunRows + " rows");
@@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
- this.connection = HConnectionManager.createConnection(getConf());
+ this.connection = ConnectionFactory.createConnection(getConf());
final String useTags = "--usetags=";
if (cmd.startsWith(useTags)) {
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
index eb1fc98ddd6..297162bfdf9 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
@@ -19,10 +19,10 @@
package org.apache.hadoop.hbase.rest.client;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -99,9 +98,7 @@ public class TestRemoteTable {
htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3));
htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3));
admin.createTable(htd);
- Table table = null;
- try {
- table = TEST_UTIL.getConnection().getTable(TABLE);
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
Put put = new Put(ROW_1);
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
table.put(put);
@@ -110,9 +107,6 @@ public class TestRemoteTable {
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2);
put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2);
table.put(put);
- table.flushCommits();
- } finally {
- if (null != table) table.close();
}
remoteTable = new RemoteHTable(
new Client(new Cluster().add("localhost",
@@ -349,7 +343,7 @@ public class TestRemoteTable {
assertTrue(Bytes.equals(VALUE_2, value2));
Delete delete = new Delete(ROW_3);
- delete.deleteColumn(COLUMN_2, QUALIFIER_2);
+ delete.addColumn(COLUMN_2, QUALIFIER_2);
remoteTable.delete(delete);
get = new Get(ROW_3);
@@ -464,7 +458,7 @@ public class TestRemoteTable {
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
assertTrue(remoteTable.exists(get));
- assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length);
+ assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length);
Delete delete = new Delete(ROW_1);
remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
index 848bd56720a..c16b4c3be73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
@@ -60,8 +60,7 @@ import com.google.protobuf.ServiceException;
@InterfaceStability.Stable
public final class HTableWrapper implements HTableInterface {
- private TableName tableName;
- private final Table table;
+ private final HTableInterface table;
private ClusterConnection connection;
private final List openTables;
@@ -78,7 +77,6 @@ public final class HTableWrapper implements HTableInterface {
private HTableWrapper(List openTables, TableName tableName,
ClusterConnection connection, ExecutorService pool)
throws IOException {
- this.tableName = tableName;
this.table = connection.getTable(tableName, pool);
this.connection = connection;
this.openTables = openTables;
@@ -244,7 +242,7 @@ public final class HTableWrapper implements HTableInterface {
@Override
public byte[] getTableName() {
- return tableName.getName();
+ return table.getTableName();
}
@Override
@@ -320,7 +318,7 @@ public final class HTableWrapper implements HTableInterface {
@Override
public void setAutoFlush(boolean autoFlush) {
- table.setAutoFlushTo(autoFlush);
+ table.setAutoFlush(autoFlush);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
index 563b1f8cf30..6e0d9e70e08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InvalidJobConfException;
@@ -52,22 +52,22 @@ public class TableOutputFormat extends FileOutputFormat {
- private Table m_table;
+ private BufferedMutator m_mutator;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
* lifecycle of {@code conn}.
*/
- public TableRecordWriter(final Table table) throws IOException {
- this.m_table = table;
+ public TableRecordWriter(final BufferedMutator mutator) throws IOException {
+ this.m_mutator = mutator;
}
public void close(Reporter reporter) throws IOException {
- this.m_table.close();
+ this.m_mutator.close();
}
public void write(ImmutableBytesWritable key, Put value) throws IOException {
- m_table.put(new Put(value));
+ m_mutator.mutate(new Put(value));
}
}
@@ -77,13 +77,12 @@ public class TableOutputFormat extends FileOutputFormat {
private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
Connection connection;
- Map tables;
+ Map mutatorMap = new HashMap<>();
Configuration conf;
boolean useWriteAheadLogging;
@@ -91,7 +91,6 @@ public class MultiTableOutputFormat extends OutputFormat();
this.conf = conf;
this.useWriteAheadLogging = useWriteAheadLogging;
}
@@ -99,28 +98,28 @@ public class MultiTableOutputFormat extends OutputFormat {
private Connection connection;
- private Table table;
+ private BufferedMutator mutator;
/**
* @throws IOException
@@ -94,8 +94,7 @@ implements Configurable {
public TableRecordWriter() throws IOException {
String tableName = conf.get(OUTPUT_TABLE);
this.connection = ConnectionFactory.createConnection(conf);
- this.table = connection.getTable(TableName.valueOf(tableName));
- this.table.setAutoFlushTo(false);
+ this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
LOG.info("Created table instance for " + tableName);
}
/**
@@ -103,12 +102,12 @@ implements Configurable {
*
* @param context The context.
* @throws IOException When closing the writer fails.
- * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ * @see RecordWriter#close(TaskAttemptContext)
*/
@Override
public void close(TaskAttemptContext context)
throws IOException {
- table.close();
+ mutator.close();
connection.close();
}
@@ -118,14 +117,15 @@ implements Configurable {
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
- * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+ * @see RecordWriter#write(Object, Object)
*/
@Override
public void write(KEY key, Mutation value)
throws IOException {
- if (value instanceof Put) table.put(new Put((Put)value));
- else if (value instanceof Delete) table.delete(new Delete((Delete)value));
- else throw new IOException("Pass a Delete or a Put");
+ if (!(value instanceof Put) && !(value instanceof Delete)) {
+ throw new IOException("Pass a Delete or a Put");
+ }
+ mutator.mutate(value);
}
}
@@ -136,11 +136,9 @@ implements Configurable {
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
* @throws InterruptedException When the jobs is cancelled.
- * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public RecordWriter getRecordWriter(
- TaskAttemptContext context)
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableRecordWriter();
}
@@ -151,7 +149,7 @@ implements Configurable {
* @param context The current context.
* @throws IOException When the check fails.
* @throws InterruptedException When the job is aborted.
- * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
+ * @see OutputFormat#checkOutputSpecs(JobContext)
*/
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
@@ -167,7 +165,7 @@ implements Configurable {
* @return The committer.
* @throws IOException When creating the committer fails.
* @throws InterruptedException When the job is aborted.
- * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
*/
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index aadc2f99853..2e7afa52cc8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -49,13 +49,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -392,6 +391,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
throws IOException, InterruptedException {
final Class extends Test> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
+ @SuppressWarnings("unchecked")
Future[] threads = new Future[opts.numClientThreads];
RunResult[] results = new RunResult[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@@ -457,7 +457,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
@@ -940,7 +940,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private final Sampler> traceSampler;
private final SpanReceiverHost receiverHost;
protected Connection connection;
- protected Table table;
+// protected Table table;
private String testName;
private Histogram latency;
@@ -1022,25 +1022,25 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (!opts.oneCon) {
this.connection = ConnectionFactory.createConnection(conf);
}
- this.table = connection.getTable(TableName.valueOf(opts.tableName));
- this.table.setAutoFlushTo(opts.autoFlush);
+ onStartup();
latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
}
+ abstract void onStartup() throws IOException;
+
void testTakedown() throws IOException {
reportLatency();
reportValueSize();
- if (opts.flushCommits) {
- this.table.flushCommits();
- }
- table.close();
+ onTakedown();
if (!opts.oneCon) {
connection.close();
}
receiverHost.closeReceivers();
}
+ abstract void onTakedown() throws IOException;
+
/*
* Run test
* @return Elapsed time.
@@ -1136,7 +1136,43 @@ public class PerformanceEvaluation extends Configured implements Tool {
abstract void testRow(final int i) throws IOException, InterruptedException;
}
- static class RandomSeekScanTest extends Test {
+ static abstract class TableTest extends Test {
+ protected Table table;
+
+ TableTest(Connection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void onStartup() throws IOException {
+ this.table = connection.getTable(TableName.valueOf(opts.tableName));
+ }
+
+ @Override
+ void onTakedown() throws IOException {
+ table.close();
+ }
+ }
+
+ static abstract class BufferedMutatorTest extends Test {
+ protected BufferedMutator mutator;
+
+ BufferedMutatorTest(Connection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void onStartup() throws IOException {
+ this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
+ }
+
+ @Override
+ void onTakedown() throws IOException {
+ mutator.close();
+ }
+ }
+
+ static class RandomSeekScanTest extends TableTest {
RandomSeekScanTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@@ -1166,7 +1202,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- static abstract class RandomScanWithRangeTest extends Test {
+ static abstract class RandomScanWithRangeTest extends TableTest {
RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@@ -1254,7 +1290,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
- static class RandomReadTest extends Test {
+ static class RandomReadTest extends TableTest {
private final Consistency consistency;
private ArrayList gets;
private Random rd = new Random();
@@ -1308,7 +1344,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
- static class RandomWriteTest extends Test {
+ static class RandomWriteTest extends BufferedMutatorTest {
RandomWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@@ -1334,11 +1370,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
- table.put(put);
+ mutator.mutate(put);
}
}
- static class ScanTest extends Test {
+ static class ScanTest extends TableTest {
private ResultScanner testScanner;
ScanTest(Connection con, TestOptions options, Status status) {
@@ -1371,7 +1407,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- static class SequentialReadTest extends Test {
+ static class SequentialReadTest extends TableTest {
SequentialReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@@ -1387,7 +1423,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
- static class SequentialWriteTest extends Test {
+ static class SequentialWriteTest extends BufferedMutatorTest {
SequentialWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@@ -1413,11 +1449,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
- table.put(put);
+ mutator.mutate(put);
}
}
- static class FilteredScanTest extends Test {
+ static class FilteredScanTest extends TableTest {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
FilteredScanTest(Connection con, TestOptions options, Status status) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java
index 711d5925567..278973efa75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java
@@ -24,19 +24,17 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestCase.FlushCache;
import org.apache.hadoop.hbase.HBaseTestCase.HTableIncommon;
import org.apache.hadoop.hbase.HBaseTestCase.Incommon;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -220,14 +217,16 @@ public class TestMultiVersions {
}
}
// Insert data
+ List puts = new ArrayList<>();
for (int i = 0; i < startKeys.length; i++) {
for (int j = 0; j < timestamp.length; j++) {
Put put = new Put(rows[i], timestamp[j]);
put.add(HConstants.CATALOG_FAMILY, null, timestamp[j],
Bytes.toBytes(timestamp[j]));
- table.put(put);
+ puts.add(put);
}
}
+ table.put(puts);
// There are 5 cases we have to test. Each is described below.
for (int i = 0; i < rows.length; i++) {
for (int j = 0; j < timestamp.length; j++) {
@@ -241,7 +240,6 @@ public class TestMultiVersions {
}
assertTrue(cellCount == 1);
}
- table.flushCommits();
}
// Case 1: scan with LATEST_TIMESTAMP. Should get two rows
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 31f390797bf..82f62e44cfb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -135,7 +135,7 @@ public class TestClientPushback {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
- table.ap.submit(tablename, ops, true, new Batch.Callback() {
+ table.mutator.ap.submit(tablename, ops, true, new Batch.Callback() {
@Override
public void update(byte[] region, byte[] row, Result result) {
endTime.set(EnvironmentEdgeManager.currentTime());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java
index e017bccd33d..2cb2cfc1529 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java
@@ -100,31 +100,30 @@ public class TestCloneSnapshotFromClient {
// take an empty snapshot
admin.snapshot(emptySnapshot, tableName);
- Table table = TEST_UTIL.getConnection().getTable(tableName);
- try {
- // enable table and insert data
- admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
+ // enable table and insert data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)){
snapshot0Rows = TEST_UTIL.countRows(table);
- admin.disableTable(tableName);
-
- // take a snapshot
- admin.snapshot(snapshotName0, tableName);
-
- // enable table and insert more data
- admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
- snapshot1Rows = TEST_UTIL.countRows(table);
- admin.disableTable(tableName);
-
- // take a snapshot of the updated table
- admin.snapshot(snapshotName1, tableName);
-
- // re-enable table
- admin.enableTable(tableName);
- } finally {
- table.close();
}
+ admin.disableTable(tableName);
+
+ // take a snapshot
+ admin.snapshot(snapshotName0, tableName);
+
+ // enable table and insert more data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)){
+ snapshot1Rows = TEST_UTIL.countRows(table);
+ }
+ admin.disableTable(tableName);
+
+ // take a snapshot of the updated table
+ admin.snapshot(snapshotName1, tableName);
+
+ // re-enable table
+ admin.enableTable(tableName);
}
protected int getNumReplicas() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 0ebafaf432b..c77ab2978a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -689,15 +689,15 @@ public class TestFromClientSide {
public void testMaxKeyValueSize() throws Exception {
byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize");
Configuration conf = TEST_UTIL.getConfiguration();
- String oldMaxSize = conf.get("hbase.client.keyvalue.maxsize");
+ String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY);
Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
byte[] value = new byte[4 * 1024 * 1024];
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, value);
ht.put(put);
try {
- TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 2 * 1024 * 1024);
- TABLE = Bytes.toBytes("testMaxKeyValueSize2");
+ TEST_UTIL.getConfiguration().setInt(
+ TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024);
// Create new table so we pick up the change in Configuration.
try (Connection connection =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
@@ -709,7 +709,7 @@ public class TestFromClientSide {
}
fail("Inserting a too large KeyValue worked, should throw exception");
} catch(Exception e) {}
- conf.set("hbase.client.keyvalue.maxsize", oldMaxSize);
+ conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize);
}
@Test
@@ -3903,7 +3903,7 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"),
new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY});
- table.setAutoFlushTo(false);
+ table.setAutoFlush(false);
ArrayList rowsUpdate = new ArrayList();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
byte[] row = Bytes.toBytes("row" + i);
@@ -3934,6 +3934,7 @@ public class TestFromClientSide {
Result row : scanner)
nbRows++;
assertEquals(NB_BATCH_ROWS * 10, nbRows);
+ table.close();
}
@Test
@@ -3944,7 +3945,6 @@ public class TestFromClientSide {
final int NB_BATCH_ROWS = 10;
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"),
new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY });
- table.setAutoFlushTo(false);
table.setWriteBufferSize(10);
ArrayList rowsUpdate = new ArrayList();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
@@ -3956,8 +3956,6 @@ public class TestFromClientSide {
}
table.put(rowsUpdate);
- table.flushCommits();
-
Scan scan = new Scan();
scan.addFamily(CONTENTS_FAMILY);
ResultScanner scanner = table.getScanner(scan);
@@ -4146,6 +4144,7 @@ public class TestFromClientSide {
HBaseAdmin ha = new HBaseAdmin(t.getConnection());
assertTrue(ha.tableExists(tableName));
assertTrue(t.get(new Get(ROW)).isEmpty());
+ ha.close();
}
/**
@@ -4159,9 +4158,10 @@ public class TestFromClientSide {
final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect");
HTable t = createUnmangedHConnectionHTable(tableName);
Connection conn = t.getConnection();
- HBaseAdmin ha = new HBaseAdmin(conn);
- assertTrue(ha.tableExists(tableName));
- assertTrue(t.get(new Get(ROW)).isEmpty());
+ try (HBaseAdmin ha = new HBaseAdmin(conn)) {
+ assertTrue(ha.tableExists(tableName));
+ assertTrue(t.get(new Get(ROW)).isEmpty());
+ }
// stop the master
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
@@ -4174,9 +4174,10 @@ public class TestFromClientSide {
// test that the same unmanaged connection works with a new
// HBaseAdmin and can connect to the new master;
- HBaseAdmin newAdmin = new HBaseAdmin(conn);
- assertTrue(newAdmin.tableExists(tableName));
- assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1);
+ try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) {
+ assertTrue(newAdmin.tableExists(tableName));
+ assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1);
+ }
}
@Test
@@ -4273,7 +4274,6 @@ public class TestFromClientSide {
new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024);
// set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow
// in Store.rowAtOrBeforeFromStoreFile
- table.setAutoFlushTo(true);
String regionName = table.getRegionLocations().firstKey().getEncodedName();
HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName);
@@ -4348,6 +4348,8 @@ public class TestFromClientSide {
assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null));
assertTrue(Bytes.equals(result.getRow(), forthRow));
assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four));
+
+ table.close();
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index bab78ab2052..abea6997e3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -149,7 +149,7 @@ public class TestMultiParallel {
ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
try {
try (Table t = connection.getTable(TEST_TABLE, executor)) {
- List puts = constructPutRequests(); // creates a Put for every region
+ List puts = constructPutRequests(); // creates a Put for every region
t.batch(puts);
HashSet regionservers = new HashSet();
try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
@@ -172,7 +172,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// load test data
- List puts = constructPutRequests();
+ List puts = constructPutRequests();
table.batch(puts);
// create a list of gets and run it
@@ -262,16 +262,12 @@ public class TestMultiParallel {
// Load the data
LOG.info("get new table");
Table table = UTIL.getConnection().getTable(TEST_TABLE);
- table.setAutoFlushTo(false);
table.setWriteBufferSize(10 * 1024 * 1024);
LOG.info("constructPutRequests");
- List puts = constructPutRequests();
- for (Row put : puts) {
- table.put((Put) put);
- }
+ List puts = constructPutRequests();
+ table.put(puts);
LOG.info("puts");
- table.flushCommits();
final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
.size();
assert liveRScount > 0;
@@ -290,11 +286,7 @@ public class TestMultiParallel {
// try putting more keys after the abort. same key/qual... just validating
// no exceptions thrown
puts = constructPutRequests();
- for (Row put : puts) {
- table.put((Put) put);
- }
-
- table.flushCommits();
+ table.put(puts);
}
LOG.info("validating loaded data");
@@ -332,7 +324,7 @@ public class TestMultiParallel {
LOG.info("test=testBatchWithPut");
Table table = CONNECTION.getTable(TEST_TABLE);
// put multiple rows using a batch
- List puts = constructPutRequests();
+ List puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@@ -364,7 +356,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// Load some data
- List puts = constructPutRequests();
+ List puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@@ -372,7 +364,7 @@ public class TestMultiParallel {
List deletes = new ArrayList();
for (int i = 0; i < KEYS.length; i++) {
Delete delete = new Delete(KEYS[i]);
- delete.deleteFamily(BYTES_FAMILY);
+ delete.addFamily(BYTES_FAMILY);
deletes.add(delete);
}
results = table.batch(deletes);
@@ -393,7 +385,7 @@ public class TestMultiParallel {
Table table = UTIL.getConnection().getTable(TEST_TABLE);
// Load some data
- List puts = constructPutRequests();
+ List puts = constructPutRequests();
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
@@ -665,8 +657,8 @@ public class TestMultiParallel {
}
}
- private List constructPutRequests() {
- List puts = new ArrayList();
+ private List constructPutRequests() {
+ List puts = new ArrayList<>();
for (byte[] k : KEYS) {
Put put = new Put(k);
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java
index a5dce028bca..c5e6449945c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java
@@ -111,11 +111,12 @@ public class TestRestoreSnapshotFromClient {
// take an empty snapshot
admin.snapshot(emptySnapshot, tableName);
- Table table = TEST_UTIL.getConnection().getTable(tableName);
// enable table and insert data
admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
- snapshot0Rows = TEST_UTIL.countRows(table);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ snapshot0Rows = TEST_UTIL.countRows(table);
+ }
admin.disableTable(tableName);
// take a snapshot
@@ -123,9 +124,10 @@ public class TestRestoreSnapshotFromClient {
// enable table and insert more data
admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
- snapshot1Rows = TEST_UTIL.countRows(table);
- table.close();
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ snapshot1Rows = TEST_UTIL.countRows(table);
+ }
}
@After
@@ -184,7 +186,7 @@ public class TestRestoreSnapshotFromClient {
assertEquals(2, table.getTableDescriptor().getFamilies().size());
HTableDescriptor htd = admin.getTableDescriptor(tableName);
assertEquals(2, htd.getFamilies().size());
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
long snapshot2Rows = snapshot1Rows + 500;
assertEquals(snapshot2Rows, TEST_UTIL.countRows(table));
assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index 018bdc4044d..dcf26f2a1a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -133,17 +133,16 @@ public class TestRpcControllerFactory {
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(name);
- table.setAutoFlushTo(false);
byte[] row = Bytes.toBytes("row");
Put p = new Put(row);
p.add(fam1, fam1, Bytes.toBytes("val0"));
table.put(p);
- table.flushCommits();
+
Integer counter = 1;
counter = verifyCount(counter);
Delete d = new Delete(row);
- d.deleteColumn(fam1, fam1);
+ d.addColumn(fam1, fam1);
table.delete(d);
counter = verifyCount(counter);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java
index 4649961b176..317707a6861 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java
@@ -176,11 +176,11 @@ public class TestHTableWrapper {
private void checkAutoFlush() {
boolean initialAutoFlush = hTableInterface.isAutoFlush();
- hTableInterface.setAutoFlushTo(false);
+ hTableInterface.setAutoFlush(false);
assertFalse(hTableInterface.isAutoFlush());
- hTableInterface.setAutoFlushTo(true);
+ hTableInterface.setAutoFlush(true);
assertTrue(hTableInterface.isAutoFlush());
- hTableInterface.setAutoFlushTo(initialAutoFlush);
+ hTableInterface.setAutoFlush(initialAutoFlush);
}
private void checkBufferSize() throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index f56811e9841..f2376318fbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -937,7 +937,6 @@ public class TestDistributedLogSplitting {
if (key == null || key.length == 0) {
key = new byte[] { 0, 0, 0, 0, 1 };
}
- ht.setAutoFlushTo(true);
Put put = new Put(key);
put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
ht.put(put);
@@ -1629,11 +1628,11 @@ public class TestDistributedLogSplitting {
/**
* Load table with puts and deletes with expected values so that we can verify later
*/
- private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
- t.setAutoFlushTo(false);
+ private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
byte[] k = new byte[3];
// add puts
+ List puts = new ArrayList<>();
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
@@ -1642,11 +1641,11 @@ public class TestDistributedLogSplitting {
k[2] = b3;
Put put = new Put(k);
put.add(f, column, k);
- t.put(put);
+ puts.add(put);
}
}
}
- t.flushCommits();
+ t.put(puts);
// add deletes
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
k[0] = 'a';
@@ -1655,7 +1654,6 @@ public class TestDistributedLogSplitting {
Delete del = new Delete(k);
t.delete(del);
}
- t.flushCommits();
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
index d2f1eab40a3..80287569467 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
@@ -83,11 +83,11 @@ public class TestMaster {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
- HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME);
- assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME,
- TableState.State.ENABLED));
- TEST_UTIL.loadTable(ht, FAMILYNAME, false);
- ht.close();
+ try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) {
+ assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME,
+ TableState.State.ENABLED));
+ TEST_UTIL.loadTable(ht, FAMILYNAME, false);
+ }
List> tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
m.getConnection(), TABLENAME);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index 4a6ba4141c7..96b43420487 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -99,9 +99,9 @@ public class TestEndToEndSplitTransaction {
TableName tableName =
TableName.valueOf("TestSplit");
byte[] familyName = Bytes.toBytes("fam");
- HTable ht = TEST_UTIL.createTable(tableName, familyName);
- TEST_UTIL.loadTable(ht, familyName, false);
- ht.close();
+ try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) {
+ TEST_UTIL.loadTable(ht, familyName, false);
+ }
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
byte []firstRow = Bytes.toBytes("aaa");
byte []splitRow = Bytes.toBytes("lll");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index c9608c9d2ee..b1c3692d6d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -202,23 +202,22 @@ public class TestFSErrorsExposed {
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
// Make a new Configuration so it makes a new connection that has the
// above configuration on it; else we use the old one w/ 10 as default.
- Table table = util.getConnection().getTable(tableName);
-
- // Load some data
- util.loadTable(table, fam, false);
- table.flushCommits();
- util.flush();
- util.countRows(table);
-
- // Kill the DFS cluster
- util.getDFSCluster().shutdownDataNodes();
-
- try {
+ try (Table table = util.getConnection().getTable(tableName)) {
+ // Load some data
+ util.loadTable(table, fam, false);
+ util.flush();
util.countRows(table);
- fail("Did not fail to count after removing data");
- } catch (Exception e) {
- LOG.info("Got expected error", e);
- assertTrue(e.getMessage().contains("Could not seek"));
+
+ // Kill the DFS cluster
+ util.getDFSCluster().shutdownDataNodes();
+
+ try {
+ util.countRows(table);
+ fail("Did not fail to count after removing data");
+ } catch (Exception e) {
+ LOG.info("Got expected error", e);
+ assertTrue(e.getMessage().contains("Could not seek"));
+ }
}
// Restart data nodes so that HBase can shut down cleanly.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
index 8e0bd219c33..8e7fe0488cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
@@ -78,6 +78,7 @@ public class TestRegionFavoredNodes {
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ table.close();
if (createWithFavoredNode == null) {
return;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index b95b20a21c1..aa071ef6ed4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@ -31,6 +31,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
import static org.junit.Assert.*;
import java.io.IOException;
@@ -109,10 +110,11 @@ public class TestRegionServerMetrics {
TEST_UTIL.createTable(tName, cfName);
- TEST_UTIL.getConnection().getTable(tName).close(); //wait for the table to come up.
+ Connection connection = TEST_UTIL.getConnection();
+ connection.getTable(tName).close(); //wait for the table to come up.
// Do a first put to be sure that the connection is established, meta is there and so on.
- HTable table = (HTable) TEST_UTIL.getConnection().getTable(tName);
+ Table table = connection.getTable(tName);
Put p = new Put(row);
p.add(cfName, qualifier, initValue);
table.put(p);
@@ -141,19 +143,21 @@ public class TestRegionServerMetrics {
metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource);
metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource);
- for ( HRegionInfo i:table.getRegionLocations().keySet()) {
- MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
- .getMetrics()
- .getSource()
- .getAggregateSource();
- String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
- "_table_"+tableNameString +
- "_region_" + i.getEncodedName()+
- "_metric";
- metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg);
- metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg);
+ try (RegionLocator locator = connection.getRegionLocator(tName)) {
+ for ( HRegionLocation location: locator.getAllRegionLocations()) {
+ HRegionInfo i = location.getRegionInfo();
+ MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
+ .getMetrics()
+ .getSource()
+ .getAggregateSource();
+ String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
+ "_table_"+tableNameString +
+ "_region_" + i.getEncodedName()+
+ "_metric";
+ metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg);
+ metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg);
+ }
}
-
List gets = new ArrayList();
for (int i=0; i< 10; i++) {
gets.add(new Get(row));
@@ -165,11 +169,11 @@ public class TestRegionServerMetrics {
metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource);
metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource);
- table.setAutoFlushTo(false);
+ List puts = new ArrayList<>();
for (int i=0; i< 30; i++) {
- table.put(p);
+ puts.add(p);
}
- table.flushCommits();
+ table.put(puts);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource);
@@ -325,35 +329,39 @@ public class TestRegionServerMetrics {
byte[] val = Bytes.toBytes("One");
- HTable t = TEST_UTIL.createTable(tableName, cf);
- t.setAutoFlushTo(false);
+ List puts = new ArrayList<>();
for (int insertCount =0; insertCount < 100; insertCount++) {
Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
p.add(cf, qualifier, val);
- t.put(p);
+ puts.add(p);
}
- t.flushCommits();
+ try (HTable t = TEST_UTIL.createTable(tableName, cf)) {
+ t.put(puts);
- Scan s = new Scan();
- s.setBatch(1);
- s.setCaching(1);
- ResultScanner resultScanners = t.getScanner(s);
+ Scan s = new Scan();
+ s.setBatch(1);
+ s.setCaching(1);
+ ResultScanner resultScanners = t.getScanner(s);
- for (int nextCount = 0; nextCount < 30; nextCount++) {
- Result result = resultScanners.next();
- assertNotNull(result);
- assertEquals(1, result.size());
+ for (int nextCount = 0; nextCount < 30; nextCount++) {
+ Result result = resultScanners.next();
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ }
}
- for ( HRegionInfo i:t.getRegionLocations().keySet()) {
- MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
- .getMetrics()
- .getSource()
- .getAggregateSource();
- String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
- "_table_"+tableNameString +
- "_region_" + i.getEncodedName()+
- "_metric";
- metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg);
+ try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
+ for ( HRegionLocation location: locator.getAllRegionLocations()) {
+ HRegionInfo i = location.getRegionInfo();
+ MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
+ .getMetrics()
+ .getSource()
+ .getAggregateSource();
+ String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
+ "_table_"+tableNameString +
+ "_region_" + i.getEncodedName()+
+ "_metric";
+ metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg);
+ }
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index bd5439af83b..86515a6230c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -91,7 +91,6 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
- table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
@@ -172,19 +171,16 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put0);
- table.flushCommits();
admin.flush(tableName);
Put put1 = new Put(Bytes.toBytes("row2"));
put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put1);
- table.flushCommits();
admin.flush(tableName);
put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version1")));
table.put(put0);
- table.flushCommits();
admin.flush(tableName);
admin.compact(tableName);
@@ -221,7 +217,6 @@ public class TestScannerWithBulkload {
put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version0")));
table.put(put1);
- table.flushCommits();
bulkload.doBulkLoad(hfilePath, (HTable) table);
latch.countDown();
} catch (TableNotFoundException e) {
@@ -263,7 +258,6 @@ public class TestScannerWithBulkload {
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
- table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 1da3662def5..111acf3a27f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -312,7 +312,7 @@ public class TestLogRolling {
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
- assertTrue(table.isAutoFlush());
+ assertTrue(((HTable) table).isAutoFlush());
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
final FSHLog log = (FSHLog) server.getWAL(null);
@@ -456,8 +456,6 @@ public class TestLogRolling {
writeData(table, 1002);
- table.setAutoFlushTo(true);
-
long curTime = System.currentTimeMillis();
LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));
long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
index 7ca9fed3809..4bb18420dc8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -54,7 +53,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
*/
@Before
public void setUp() throws Exception {
- htable1.setAutoFlushTo(false);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r :
@@ -119,7 +117,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
Put put = new Put(row);
put.add(famName, row, row);
- htable1 = utility1.getConnection().getTable(tableName);
+ if (htable1 == null) {
+ htable1 = utility1.getConnection().getTable(tableName);
+ }
+
htable1.put(put);
Get get = new Get(row);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index bfb01dbf6e7..f0db865bad2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -70,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
*/
@Before
public void setUp() throws Exception {
- htable1.setAutoFlushTo(true);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for ( JVMClusterUtil.RegionServerThread r :
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
index adf3c0eb366..f1e956ceffa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
@@ -151,7 +151,6 @@ public class TestReplicationWithTags {
admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
htable1 = utility1.getConnection().getTable(TABLE_NAME);
- htable1.setWriteBufferSize(1024);
htable2 = utility2.getConnection().getTable(TABLE_NAME);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 9ea64d107b6..c087f4e6522 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -399,25 +399,18 @@ public class TestVisibilityLabelsReplication {
}
static Table writeData(TableName tableName, String... labelExps) throws Exception {
- Table table = null;
- try {
- table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
- int i = 1;
- List puts = new ArrayList();
- for (String labelExp : labelExps) {
- Put put = new Put(Bytes.toBytes("row" + i));
- put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
- put.setCellVisibility(new CellVisibility(labelExp));
- put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP));
- puts.add(put);
- i++;
- }
- table.put(puts);
- } finally {
- if (table != null) {
- table.flushCommits();
- }
+ Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
+ int i = 1;
+ List puts = new ArrayList();
+ for (String labelExp : labelExps) {
+ Put put = new Put(Bytes.toBytes("row" + i));
+ put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
+ put.setCellVisibility(new CellVisibility(labelExp));
+ put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP));
+ puts.add(put);
+ i++;
}
+ table.put(puts);
return table;
}
// A simple BaseRegionbserver impl that allows to add a non-visibility tag from the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
index 74f358d255a..a8588ccc5d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@@ -673,20 +673,22 @@ public class SnapshotTestingUtils {
public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows,
byte[]... families) throws IOException, InterruptedException {
- loadData(util, util.getConnection().getTable(tableName), rows, families);
+ BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName);
+ loadData(util, mutator, rows, families);
}
- public static void loadData(final HBaseTestingUtility util, final Table table, int rows,
+ public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows,
byte[]... families) throws IOException, InterruptedException {
- table.setAutoFlushTo(false);
-
// Ensure one row per region
assertTrue(rows >= KEYS.length);
for (byte k0: KEYS) {
byte[] k = new byte[] { k0 };
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k);
byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
- putData(table, families, key, value);
+ final byte[][] families1 = families;
+ final byte[] key1 = key;
+ final byte[] value1 = value;
+ mutator.mutate(createPut(families1, key1, value1));
rows--;
}
@@ -694,22 +696,24 @@ public class SnapshotTestingUtils {
while (rows-- > 0) {
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
- putData(table, families, key, value);
+ final byte[][] families1 = families;
+ final byte[] key1 = key;
+ final byte[] value1 = value;
+ mutator.mutate(createPut(families1, key1, value1));
}
- table.flushCommits();
+ mutator.flush();
- waitForTableToBeOnline(util, table.getName());
+ waitForTableToBeOnline(util, mutator.getName());
}
- private static void putData(final Table table, final byte[][] families,
- final byte[] key, final byte[] value) throws IOException {
+ private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
byte[] q = Bytes.toBytes("q");
Put put = new Put(key);
put.setDurability(Durability.SKIP_WAL);
for (byte[] family: families) {
put.add(family, q, value);
}
- table.put(put);
+ return put;
}
public static void deleteAllSnapshots(final Admin admin)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
index 38938714bfc..b45d676249c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,11 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
@@ -54,7 +49,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -76,7 +70,6 @@ public class TestFlushSnapshotFromClient {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final int NUM_RS = 2;
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
- private static final byte[] TEST_QUAL = Bytes.toBytes("q");
private static final TableName TABLE_NAME = TableName.valueOf("test");
private final int DEFAULT_NUM_ROWS = 100;
@@ -145,8 +138,7 @@ public class TestFlushSnapshotFromClient {
SnapshotTestingUtils.assertNoSnapshots(admin);
// put some stuff in the table
- Table table = UTIL.getConnection().getTable(TABLE_NAME);
- SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
+ SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
LOG.debug("FS state before snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
@@ -228,8 +220,7 @@ public class TestFlushSnapshotFromClient {
SnapshotTestingUtils.assertNoSnapshots(admin);
// put some stuff in the table
- Table table = UTIL.getConnection().getTable(TABLE_NAME);
- SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
+ SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
LOG.debug("FS state before snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java
index 4b36c114125..ae1ca136b80 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -103,8 +102,8 @@ public class TestRestoreFlushSnapshotFromClient {
// create Table and disable it
SnapshotTestingUtils.createTable(UTIL, tableName, FAMILY);
+ SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
Table table = UTIL.getConnection().getTable(tableName);
- SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY);
snapshot0Rows = UTIL.countRows(table);
LOG.info("=== before snapshot with 500 rows");
logFSTree();
@@ -117,7 +116,7 @@ public class TestRestoreFlushSnapshotFromClient {
logFSTree();
// insert more data
- SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY);
+ SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
snapshot1Rows = UTIL.countRows(table);
LOG.info("=== before snapshot with 1000 rows");
logFSTree();