From ab18158e6001a7f15a35679ca8fc7ff772f90e25 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 22 Jan 2015 13:42:50 -0800 Subject: [PATCH] HBASE-12728 buffered writes substantially less useful after removal of HTablePool (Solomon Duskis and Nick Dimiduk) In our pre-1.0 API, HTable is considered a light-weight object that consumed by a single thread at a time. The HTablePool class provided a means of sharing multiple HTable instances across a number of threads. As an optimization, HTable managed a "write buffer", accumulating edits and sending a "batch" all at once. By default the batch was sent as the last step in invocations of put(Put) and put(List). The user could disable the automatic flushing of the write buffer, retaining edits locally and only sending the whole "batch" once the write buffer has filled or when the flushCommits() method in invoked explicitly. Explicit or implicit batch writing was controlled by the setAutoFlushTo(boolean) method. A value of true (the default) had the write buffer flushed at the completion of a call to put(Put) or put(List). A value of false allowed for explicit buffer management. HTable also exposed the buffer to consumers via getWriteBuffer(). The combination of HTable with setAutoFlushTo(false) and the HTablePool provided a convenient mechanism by which multiple "Put-producing" threads could share a common write buffer. Both HTablePool and HTable are deprecated, and they are officially replaced in The new 1.0 API by Table and BufferedMutator. Table, which replaces HTable, no longer exposes explicit write-buffer management. Instead, explicit buffer management is exposed via BufferedMutator. BufferedMutator is made safe for concurrent use. Where code would previously retrieve and return HTables from an HTablePool, now that code creates and shares a single BufferedMutator instance across all threads. --- .../hadoop/hbase/client/BufferedMutator.java | 129 +++++++++ .../hbase/client/BufferedMutatorImpl.java | 258 ++++++++++++++++++ .../hbase/client/BufferedMutatorParams.java | 110 ++++++++ .../hadoop/hbase/client/Connection.java | 31 +++ .../hbase/client/ConnectionAdapter.java | 11 + .../hbase/client/ConnectionManager.java | 24 +- .../apache/hadoop/hbase/client/HTable.java | 139 +++------- .../hadoop/hbase/client/HTableInterface.java | 64 ++++- .../org/apache/hadoop/hbase/client/Table.java | 32 +-- .../hbase/client/TableConfiguration.java | 20 +- .../hadoop/hbase/client/TestAsyncProcess.java | 87 +++--- .../hbase/client/TestClientNoCluster.java | 49 ++-- .../example/BufferedMutatorExample.java | 119 ++++++++ .../test/IntegrationTestBigLinkedList.java | 41 +-- ...rationTestBigLinkedListWithVisibility.java | 8 +- .../test/IntegrationTestLoadAndVerify.java | 40 ++- ...onTestWithCellVisibilityLoadAndVerify.java | 4 +- .../IntegrationTestSendTraceRequests.java | 7 +- .../apache/hadoop/hbase/rest/RowResource.java | 3 - .../hbase/rest/PerformanceEvaluation.java | 105 ++++--- .../hbase/rest/client/TestRemoteTable.java | 14 +- .../hadoop/hbase/client/HTableWrapper.java | 8 +- .../hbase/mapred/TableOutputFormat.java | 19 +- .../mapreduce/MultiTableOutputFormat.java | 29 +- .../hbase/mapreduce/TableOutputFormat.java | 28 +- .../hadoop/hbase/PerformanceEvaluation.java | 76 ++++-- .../hadoop/hbase/TestMultiVersions.java | 12 +- .../hbase/client/TestClientPushback.java | 2 +- .../client/TestCloneSnapshotFromClient.java | 45 ++- .../hbase/client/TestFromClientSide.java | 32 ++- .../hbase/client/TestMultiParallel.java | 30 +- .../client/TestRestoreSnapshotFromClient.java | 16 +- .../client/TestRpcControllerFactory.java | 5 +- .../hbase/coprocessor/TestHTableWrapper.java | 6 +- .../master/TestDistributedLogSplitting.java | 10 +- .../hadoop/hbase/master/TestMaster.java | 10 +- .../TestEndToEndSplitTransaction.java | 6 +- .../regionserver/TestFSErrorsExposed.java | 31 +-- .../regionserver/TestRegionFavoredNodes.java | 1 + .../regionserver/TestRegionServerMetrics.java | 86 +++--- .../regionserver/TestScannerWithBulkload.java | 6 - .../regionserver/wal/TestLogRolling.java | 4 +- ...tReplicationChangingPeerRegionservers.java | 7 +- .../TestReplicationSmallTests.java | 2 - .../replication/TestReplicationWithTags.java | 1 - .../TestVisibilityLabelsReplication.java | 29 +- .../hbase/snapshot/SnapshotTestingUtils.java | 28 +- .../snapshot/TestFlushSnapshotFromClient.java | 13 +- .../TestRestoreFlushSnapshotFromClient.java | 5 +- 49 files changed, 1256 insertions(+), 586 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java new file mode 100644 index 00000000000..3b91078edfc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -0,0 +1,129 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + *

Used to communicate with a single HBase table similar to {@link Table} but meant for + * batched, asynchronous puts. Obtain an instance from a {@link Connection} and call + * {@link #close()} afterwards. Customizations can be applied to the {@code BufferedMutator} via + * the {@link BufferedMutatorParams}. + *

+ * + *

Exception handling with asynchronously via the {@link BufferedMutator.ExceptionListener}. + * The default implementation is to throw the exception upon receipt. This behavior can be + * overridden with a custom implementation, provided as a parameter with + * {@link BufferedMutatorParams#listener(BufferedMutator.ExceptionListener)}.

+ * + *

Map/Reduce jobs are good use cases for using {@code BufferedMutator}. Map/reduce jobs + * benefit from batching, but have no natural flush point. {@code BufferedMutator} receives the + * puts from the M/R job and will batch puts based on some heuristic, such as the accumulated size + * of the puts, and submit batches of puts asynchronously so that the M/R logic can continue + * without interruption. + *

+ * + *

{@code BufferedMutator} can also be used on more exotic circumstances. Map/Reduce batch jobs + * will have a single {@code BufferedMutator} per thread. A single {@code BufferedMutator} can + * also be effectively used in high volume online systems to batch puts, with the caveat that + * extreme circumstances, such as JVM or machine failure, may cause some data loss.

+ * + *

NOTE: This class replaces the functionality that used to be available via + * {@link HTableInterface#setAutoFlush(boolean)} set to {@code false}. + *

+ * + *

See also the {@code BufferedMutatorExample} in the hbase-examples module.

+ * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface BufferedMutator extends Closeable { + /** + * Gets the fully qualified table name instance of the table that this BufferedMutator writes to. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will + * affect this instance. + */ + Configuration getConfiguration(); + + /** + * Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the + * wire as part of a batch. Currently only supports {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + void mutate(Mutation mutation) throws IOException; + + /** + * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the + * wire as part of a batch. There is no guarantee of sending entire content of {@code mutations} + * in a single batch; it will be broken up according to the write buffer capacity. + * + * @param mutations The data to send. + * @throws IOException if a remote or network exception occurs. + */ + void mutate(List mutations) throws IOException; + + /** + * Performs a {@link #flush()} and releases any resources held. + * + * @throws IOException if a remote or network exception occurs. + */ + @Override + void close() throws IOException; + + /** + * Executes all the buffered, asynchronous {@link Mutation} operations and waits until they + * are done. + * + * @throws IOException if a remote or network exception occurs. + */ + void flush() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); + + /** + * Listens for asynchronous exceptions on a {@link BufferedMutator}. + */ + interface ExceptionListener { + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator mutator) throws RetriesExhaustedWithDetailsException; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java new file mode 100644 index 00000000000..54e7ccd41a1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -0,0 +1,258 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + *

+ * Used to communicate with a single HBase table similar to {@link HTable} + * but meant for batched, potentially asynchronous puts. Obtain an instance from + * a {@link Connection} and call {@link #close()} afterwards. + *

+ * + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class BufferedMutatorImpl implements BufferedMutator { + + private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); + + private final ExceptionListener listener; + + protected ClusterConnection connection; // non-final so can be overridden in test + private final TableName tableName; + private volatile Configuration conf; + private List writeAsyncBuffer = new LinkedList<>(); + private long writeBufferSize; + private final int maxKeyValueSize; + protected long currentWriteBufferSize = 0; + private boolean closed = false; + private final ExecutorService pool; + protected AsyncProcess ap; // non-final so can be overridden in test + + BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + if (conn == null || conn.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + + this.tableName = params.getTableName(); + this.connection = conn; + this.conf = connection.getConfiguration(); + this.pool = params.getPool(); + this.listener = params.getListener(); + + TableConfiguration tableConf = new TableConfiguration(conf); + this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? + params.getWriteBufferSize() : tableConf.getWriteBufferSize(); + this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? + params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + + // puts need to track errors globally due to how the APIs currently work. + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public synchronized void mutate(Mutation m) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + doMutate(m); + } + + @Override + public synchronized void mutate(List ms) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + for (Mutation m : ms) { + doMutate(m); + } + } + + /** + * Add the put to the buffer. If the buffer is already too large, sends the buffer to the + * cluster. + * + * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. + * @throws InterruptedIOException if we were interrupted. + */ + private void doMutate(Mutation m) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } + if (!(m instanceof Put) && !(m instanceof Delete)) { + throw new IllegalArgumentException("Pass a Delete or a Put"); + } + + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(m); + backgroundFlushCommits(true); + } + + if (m instanceof Put) { + validatePut((Put) m); + } + + currentWriteBufferSize += m.heapSize(); + writeAsyncBuffer.add(m); + + while (currentWriteBufferSize > writeBufferSize) { + backgroundFlushCommits(false); + } + } + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, maxKeyValueSize); + } + + @Override + public synchronized void close() throws IOException { + if (this.closed) { + return; + } + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + this.pool.shutdown(); + boolean terminated = false; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + } finally { + this.closed = true; + } + } + + @Override + public synchronized void flush() throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If + * the is an error (max retried reach from a previous flush or bad operation), it tries to send + * all operations in the buffer and sends an exception. + * + * @param synchronous - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + try { + if (!synchronous) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + if (ap.hasError()) { + LOG.debug(tableName + ": One or more of the operations have failed -" + + " waiting for all operation in progress to finish (successfully or not)"); + } + } + if (synchronous || ap.hasError()) { + while (!writeAsyncBuffer.isEmpty()) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + } + RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + if (error != null) { + if (listener == null) { + throw error; + } else { + this.listener.onException(error, this); + } + } + } + } finally { + currentWriteBufferSize = 0; + for (Row mut : writeAsyncBuffer) { + if (mut instanceof Mutation) { + currentWriteBufferSize += ((Mutation) mut).heapSize(); + } + } + } + } + + /** + * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought + * not be called for production uses. + * @deprecated Going away when we drop public support for {@link HTableInterface}. + */ + @Deprecated + public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, + InterruptedIOException { + this.writeBufferSize = writeBufferSize; + if (currentWriteBufferSize > writeBufferSize) { + flush(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getWriteBufferSize() { + return this.writeBufferSize; + } + + /** + * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ + * called from production uses. + * @deprecated Going away when we drop public support for {@link HTableInterface}. +Ó */ + @Deprecated + public List getWriteBuffer() { + return this.writeAsyncBuffer; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java new file mode 100644 index 00000000000..d4cdeadd34c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Parameters for instantiating a {@link BufferedMutator}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BufferedMutatorParams { + + static final int UNSET = -1; + + private final TableName tableName; + private long writeBufferSize = UNSET; + private int maxKeyValueSize = UNSET; + private ExecutorService pool = null; + private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator bufferedMutator) + throws RetriesExhaustedWithDetailsException { + throw exception; + } + }; + + public BufferedMutatorParams(TableName tableName) { + this.tableName = tableName; + } + + public TableName getTableName() { + return tableName; + } + + public long getWriteBufferSize() { + return writeBufferSize; + } + + /** + * Override the write buffer size specified by the provided {@link Connection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.write.buffer}. + */ + public BufferedMutatorParams writeBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + public int getMaxKeyValueSize() { + return maxKeyValueSize; + } + + /** + * Override the maximum key-value size specified by the provided {@link Connection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.keyvalue.maxsize}. + */ + public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) { + this.maxKeyValueSize = maxKeyValueSize; + return this; + } + + public ExecutorService getPool() { + return pool; + } + + /** + * Override the default executor pool defined by the {@code hbase.htable.threads.*} + * configuration values. + */ + public BufferedMutatorParams pool(ExecutorService pool) { + this.pool = pool; + return this; + } + + public BufferedMutator.ExceptionListener getListener() { + return listener; + } + + /** + * Override the default error handler. Default handler simply rethrows the exception. + */ + public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) { + this.listener = listener; + return this; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index 55237be5ba5..72f870fb4b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -98,6 +98,37 @@ public interface Connection extends Abortable, Closeable { */ Table getTable(TableName tableName, ExecutorService pool) throws IOException; + /** + *

+ * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The + * {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will + * use the Connection's ExecutorService. This object can be used for long lived operations. + *

+ *

+ * The caller is responsible for calling {@link BufferedMutator#close()} on + * the returned {@link BufferedMutator} instance. + *

+ *

+ * This accessor will use the connection's ExecutorService and will throw an + * exception in the main thread when an asynchronous exception occurs. + * + * @param tableName the name of the table + * + * @return a {@link BufferedMutator} for the supplied tableName. + */ + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException; + + /** + * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The + * {@link BufferedMutator} returned by this method is thread-safe. This object can be used for + * long lived table operations. The caller is responsible for calling + * {@link BufferedMutator#close()} on the returned {@link BufferedMutator} instance. + * + * @param params details on how to instantiate the {@code BufferedMutator}. + * @return a {@link BufferedMutator} for the supplied tableName. + */ + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException; + /** * Retrieve a RegionLocator implementation to inspect region information on a table. The returned * RegionLocator is not thread-safe, so a new instance should be created for each using thread. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 53c1271c265..e1458b83d33 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -109,6 +109,17 @@ abstract class ConnectionAdapter implements ClusterConnection { return wrappedConnection.getTable(tableName, pool); } + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) + throws IOException { + return wrappedConnection.getBufferedMutator(params); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return wrappedConnection.getBufferedMutator(tableName); + } + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return wrappedConnection.getRegionLocator(tableName); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 166bcdd1b49..358ef3e127a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -728,6 +728,28 @@ final class ConnectionManager { return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); } + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { + if (params.getTableName() == null) { + throw new IllegalArgumentException("TableName cannot be null."); + } + if (params.getPool() == null) { + params.pool(HTable.getDefaultExecutor(getConfiguration())); + } + if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { + params.writeBufferSize(tableConfig.getWriteBufferSize()); + } + if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { + params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); + } + return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) { + return getBufferedMutator(new BufferedMutatorParams(tableName)); + } + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return new HRegionLocator(tableName, this); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 2c405d78424..374c06d1bc7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -112,10 +111,8 @@ public class HTable implements HTableInterface { private final TableName tableName; private volatile Configuration configuration; private TableConfiguration tableConfiguration; - protected List writeAsyncBuffer = new LinkedList(); - private long writeBufferSize; + protected BufferedMutatorImpl mutator; private boolean autoFlush = true; - protected long currentWriteBufferSize = 0 ; private boolean closed = false; protected int scannerCaching; private ExecutorService pool; // For Multi & Scan @@ -125,8 +122,6 @@ public class HTable implements HTableInterface { private Consistency defaultConsistency = Consistency.STRONG; private HRegionLocator locator; - /** The Async process for puts with autoflush set to false or multiputs */ - protected AsyncProcess ap; /** The Async process for batch */ protected AsyncProcess multiAp; private RpcRetryingCallerFactory rpcCallerFactory; @@ -219,7 +214,7 @@ public class HTable implements HTableInterface { // it also scales when new region servers are added. ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); - ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + pool.allowCoreThreadTimeOut(true); return pool; } @@ -323,15 +318,18 @@ public class HTable implements HTableInterface { } /** - * For internal testing. + * For internal testing. Uses Connection provided in {@code params}. * @throws IOException */ @VisibleForTesting - protected HTable() throws IOException { - tableName = null; - tableConfiguration = new TableConfiguration(); + protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { + connection = conn; + tableName = params.getTableName(); + tableConfiguration = new TableConfiguration(connection.getConfiguration()); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; + // used from tests, don't trust the connection is real + this.mutator = new BufferedMutatorImpl(conn, null, null, params); } /** @@ -351,7 +349,6 @@ public class HTable implements HTableInterface { this.operationTimeout = tableName.isSystemTable() ? tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.writeBufferSize = tableConfiguration.getWriteBufferSize(); this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { @@ -362,8 +359,6 @@ public class HTable implements HTableInterface { } // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, - rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); this.locator = new HRegionLocator(getName(), connection); } @@ -539,7 +534,7 @@ public class HTable implements HTableInterface { */ @Deprecated public List getWriteBuffer() { - return writeAsyncBuffer; + return mutator == null ? null : mutator.getWriteBuffer(); } /** @@ -643,7 +638,7 @@ public class HTable implements HTableInterface { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs - * + * * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ @Deprecated @@ -1000,11 +995,11 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final Put put) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doPut(put); + public void put(final Put put) throws IOException { + getBufferedMutator().mutate(put); if (autoFlush) { flushCommits(); } @@ -1012,82 +1007,16 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final List puts) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Put put : puts) { - doPut(put); - } + public void put(final List puts) throws IOException { + getBufferedMutator().mutate(puts); if (autoFlush) { flushCommits(); } } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(put); - backgroundFlushCommits(true); - } - - validatePut(put); - - currentWriteBufferSize += put.heapSize(); - writeAsyncBuffer.add(put); - - while (currentWriteBufferSize > writeBufferSize) { - backgroundFlushCommits(false); - } - } - - - /** - * Send the operations in the buffer to the servers. Does not wait for the server's answer. - * If the is an error (max retried reach from a previous flush or bad operation), it tries to - * send all operations in the buffer and sends an exception. - * @param synchronous - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void backgroundFlushCommits(boolean synchronous) throws - InterruptedIOException, RetriesExhaustedWithDetailsException { - - try { - if (!synchronous) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - if (ap.hasError()) { - LOG.debug(tableName + ": One or more of the operations have failed -" + - " waiting for all operation in progress to finish (successfully or not)"); - } - } - if (synchronous || ap.hasError()) { - while (!writeAsyncBuffer.isEmpty()) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); - if (error != null) { - throw error; - } - } - } finally { - currentWriteBufferSize = 0; - for (Row mut : writeAsyncBuffer) { - if (mut instanceof Mutation) { - currentWriteBufferSize += ((Mutation) mut).heapSize(); - } - } - } - } - /** * {@inheritDoc} */ @@ -1264,7 +1193,7 @@ public class HTable implements HTableInterface { controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); @@ -1452,12 +1381,11 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); + public void flushCommits() throws IOException { + getBufferedMutator().flush(); } /** @@ -1581,7 +1509,11 @@ public class HTable implements HTableInterface { */ @Override public long getWriteBufferSize() { - return writeBufferSize; + if (mutator == null) { + return tableConfiguration.getWriteBufferSize(); + } else { + return mutator.getWriteBufferSize(); + } } /** @@ -1594,10 +1526,8 @@ public class HTable implements HTableInterface { */ @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { - this.writeBufferSize = writeBufferSize; - if(currentWriteBufferSize > writeBufferSize) { - flushCommits(); - } + getBufferedMutator(); + mutator.setWriteBufferSize(writeBufferSize); } /** @@ -1914,4 +1844,17 @@ public class HTable implements HTableInterface { public RegionLocator getRegionLocator() { return this.locator; } + + @VisibleForTesting + BufferedMutator getBufferedMutator() throws IOException { + if (mutator == null) { + this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( + new BufferedMutatorParams(tableName) + .pool(pool) + .writeBufferSize(tableConfiguration.getWriteBufferSize()) + .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + ); + } + return mutator; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index 911e034ef0b..1f4d99afd04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -67,8 +67,8 @@ public interface HTableInterface extends Table { * Whether or not to enable 'auto-flush'. * @deprecated in 0.96. When called with setAutoFlush(false), this function also * set clearBufferOnFail to true, which is unexpected but kept for historical reasons. - * Replace it with setAutoFlush(false, false) if this is exactly what you want, or by - * {@link #setAutoFlushTo(boolean)} for all other cases. + * Replace it with setAutoFlush(false, false) if this is exactly what you want, though + * this is the method you want for most cases. */ @Deprecated void setAutoFlush(boolean autoFlush); @@ -105,12 +105,68 @@ public interface HTableInterface extends Table { * the value of this parameter is ignored and clearBufferOnFail is set to true. * Setting clearBufferOnFail to false is deprecated since 0.96. * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use - * {@link #setAutoFlushTo(boolean)}} instead. - * @see #flushCommits + * {@link #setAutoFlush(boolean)}} instead. + * @see BufferedMutator#flush() */ @Deprecated void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail); + /** + * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}. + * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use + * {@link #setAutoFlush(boolean)} instead, or better still, move on to {@link BufferedMutator} + */ + @Deprecated + void setAutoFlushTo(boolean autoFlush); + + /** + * Tells whether or not 'auto-flush' is turned on. + * + * @return {@code true} if 'auto-flush' is enabled (default), meaning + * {@link Put} operations don't get buffered/delayed and are immediately + * executed. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} + */ + @Deprecated + boolean isAutoFlush(); + + /** + * Executes all the buffered {@link Put} operations. + *

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * {@link #isAutoFlush} is {@code true}. + * @throws IOException if a remote or network exception occurs. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#flush()} + */ + @Deprecated + void flushCommits() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#getWriteBufferSize()} + */ + @Deprecated + long getWriteBufferSize(); + + /** + * Sets the size of the buffer in bytes. + *

+ * If the new size is less than the current amount of data in the + * write buffer, the buffer gets flushed. + * @param writeBufferSize The new write buffer size, in bytes. + * @throws IOException if a remote or network exception occurs. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} and + * {@link BufferedMutatorParams#writeBufferSize(long)} + */ + @Deprecated + void setWriteBufferSize(long writeBufferSize) throws IOException; + + /** * Return the row that matches row exactly, * or the one that immediately precedes it. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index a408b1d35df..55fb1c49d0c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -219,9 +219,7 @@ public interface Table extends Closeable { /** * Puts some data in the table. - *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. + * * @param put The data to put. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 @@ -231,9 +229,6 @@ public interface Table extends Closeable { /** * Puts some data in the table, in batch. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - *

* This can be used for group commit, or for submitting user defined * batches. The writeBuffer will be periodically inspected while the List * is processed, so depending on the List size the writeBuffer may flush @@ -497,30 +492,6 @@ public interface Table extends Closeable { byte[] startKey, byte[] endKey, final Batch.Call callable, final Batch.Callback callback) throws ServiceException, Throwable; - /** - * Tells whether or not 'auto-flush' is turned on. - * - * @return {@code true} if 'auto-flush' is enabled (default), meaning - * {@link Put} operations don't get buffered/delayed and are immediately - * executed. - */ - boolean isAutoFlush(); - - /** - * Executes all the buffered {@link Put} operations. - *

- * This method gets called once automatically for every {@link Put} or batch - * of {@link Put}s (when put(List) is used) when - * {@link #isAutoFlush} is {@code true}. - * @throws IOException if a remote or network exception occurs. - */ - void flushCommits() throws IOException; - - /** - * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail} - */ - void setAutoFlushTo(boolean autoFlush); - /** * Returns the maximum size in bytes of the write buffer for this HTable. *

@@ -540,7 +511,6 @@ public interface Table extends Closeable { */ void setWriteBufferSize(long writeBufferSize) throws IOException; - /** * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 6176a0c1e77..70ad179bfaf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -28,20 +28,18 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class TableConfiguration { + public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; + public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; + public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; + public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; + private final long writeBufferSize; - private final int metaOperationTimeout; - private final int operationTimeout; - private final int scannerCaching; - private final int primaryCallTimeoutMicroSecond; - private final int replicaCallTimeoutMicroSecondScan; - private final int retries; - private final int maxKeyValueSize; /** @@ -49,7 +47,7 @@ public class TableConfiguration { * @param conf Configuration object */ TableConfiguration(Configuration conf) { - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); this.metaOperationTimeout = conf.getInt( HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, @@ -70,7 +68,7 @@ public class TableConfiguration { this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } /** @@ -80,14 +78,14 @@ public class TableConfiguration { */ @VisibleForTesting protected TableConfiguration() { - this.writeBufferSize = 2097152; + this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; this.primaryCallTimeoutMicroSecond = 10000; this.replicaCallTimeoutMicroSecondScan = 1000000; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; - this.maxKeyValueSize = -1; + this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; } public long getWriteBufferSize() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 88a95fbfd50..aa41939ce3c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -155,8 +155,8 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } - public MyAsyncProcess( - ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { + public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, + @SuppressWarnings("unused") boolean dummy) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { @Override @@ -644,26 +644,27 @@ public class TestAsyncProcess { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); + Mockito.when(hc.getConfiguration()).thenReturn(conf); return hc; } @Test public void testHTablePutSuccess() throws Exception { - HTable ht = Mockito.mock(HTable.class); + BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); ht.ap = new MyAsyncProcess(createHConnection(), conf, true); Put put = createPut(1, true); Assert.assertEquals(0, ht.getWriteBufferSize()); - ht.put(put); + ht.mutate(put); Assert.assertEquals(0, ht.getWriteBufferSize()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { - HTable ht = new HTable(); - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - ht.setAutoFlushTo(true); + ClusterConnection conn = createHConnection(); + HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + ht.mutator.ap = ap; if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); } else { @@ -672,7 +673,7 @@ public class TestAsyncProcess { Put put = createPut(1, false); - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); try { ht.put(put); if (bufferOn) { @@ -681,7 +682,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -708,14 +709,14 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { - HTable ht = new HTable(); - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - ht.setAutoFlushTo(false); - ht.setWriteBufferSize(0); + ClusterConnection conn = createHConnection(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, + new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + mutator.ap = ap; Put p = createPut(1, false); - ht.put(p); + mutator.mutate(p); ap.waitUntilDone(); // Let's do all the retries. @@ -725,13 +726,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, ht.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.getWriteBuffer().size()); try { - ht.put(p); + mutator.mutate(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size()); } @@ -762,9 +763,9 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { - HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(conf); - ht.multiAp = new MyAsyncProcess(ht.connection, conf, false); + ClusterConnection conn = new MyConnectionImpl(conf); + HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); + ht.multiAp = new MyAsyncProcess(conn, conf, false); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -793,26 +794,24 @@ public class TestAsyncProcess { @Test public void testErrorsServers() throws IOException { - HTable ht = new HTable(); Configuration configuration = new Configuration(conf); + ClusterConnection conn = new MyConnectionImpl(configuration); + BufferedMutatorImpl mutator = + new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true); - // set default writeBufferSize - ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); - ht.connection = new MyConnectionImpl(configuration); - MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true); - ht.ap = ap; + MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); - Assert.assertTrue(ht.ap.serverTrackerTimeout > 200); - ht.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); + mutator.ap.serverTrackerTimeout = 1; Put p = createPut(1, false); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.mutate(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -822,19 +821,18 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { - HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(conf); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf); - ht.ap = ap; + ClusterConnection conn = new MyConnectionImpl(conf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); Put p = createPut(1, true); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.mutate(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -861,13 +859,12 @@ public class TestAsyncProcess { gets.add(get); } - HTable ht = new HTable(); MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - ht.connection = con; + HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE)); MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); ht.multiAp = ap; - ht.batch(gets); + ht.batch(gets, new Object[gets.size()]); Assert.assertEquals(ap.nbActions.get(), NB_REGS); Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 43e0b756a73..d155fd7a0af 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -711,36 +711,47 @@ public class TestClientNoCluster extends Configured implements Tool { * @throws IOException */ static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { - Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE)); - table.setAutoFlushTo(false); long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); long startTime = System.currentTimeMillis(); final int printInterval = 100000; Random rd = new Random(id); boolean get = c.getBoolean("hbase.test.do.gets", false); - try { - Stopwatch stopWatch = new Stopwatch(); - stopWatch.start(); - for (int i = 0; i < namespaceSpan; i++) { - byte [] b = format(rd.nextLong()); - if (get){ + TableName tableName = TableName.valueOf(BIG_USER_TABLE); + if (get) { + try (Table table = sharedConnection.getTable(tableName)){ + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Get g = new Get(b); table.get(g); - } else { + if (i % printInterval == 0) { + LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } + } + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); + } + } else { + try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Put p = new Put(b); p.add(HConstants.CATALOG_FAMILY, b, b); - table.put(p); + mutator.mutate(p); + if (i % printInterval == 0) { + LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } } - if (i % printInterval == 0) { - LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); - stopWatch.reset(); - stopWatch.start(); + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); } - } - LOG.info("Finished a cycle putting " + namespaceSpan + " in " + - (System.currentTimeMillis() - startTime) + "ms"); - } finally { - table.close(); } } diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java new file mode 100644 index 00000000000..ab96741de51 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java @@ -0,0 +1,119 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An example of using the {@link BufferedMutator} interface. + */ +public class BufferedMutatorExample extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(BufferedMutatorExample.class); + + private static final int POOL_SIZE = 10; + private static final int TASK_COUNT = 100; + private static final TableName TABLE = TableName.valueOf("foo"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + + @Override + public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { + + /** a callback invoked when an asynchronous write fails. */ + final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { + for (int i = 0; i < e.getNumExceptions(); i++) { + LOG.info("Failed to sent put " + e.getRow(i) + "."); + } + } + }; + BufferedMutatorParams params = new BufferedMutatorParams(TABLE) + .listener(listener); + + // + // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. + // + try (final Connection conn = ConnectionFactory.createConnection(getConf()); + final BufferedMutator mutator = conn.getBufferedMutator(params)) { + + /** worker pool that operates on BufferedTable instances */ + final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE); + List> futures = new ArrayList<>(TASK_COUNT); + + for (int i = 0; i < TASK_COUNT; i++) { + futures.add(workerPool.submit(new Callable() { + @Override + public Void call() throws Exception { + // + // step 2: each worker sends edits to the shared BufferedMutator instance. They all use + // the same backing buffer, call-back "listener", and RPC executor pool. + // + Put p = new Put(Bytes.toBytes("someRow")); + p.add(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value")); + mutator.mutate(p); + // do work... maybe you want to call mutator.flush() after many edits to ensure any of + // this worker's edits are sent before exiting the Callable + return null; + } + })); + } + + // + // step 3: clean up the worker pool, shut down. + // + for (Future f : futures) { + f.get(5, TimeUnit.MINUTES); + } + workerPool.shutdown(); + } catch (IOException e) { + // exception while creating/destroying Connection or BufferedMutator + LOG.info("exception while creating/destroying Connection or BufferedMutator", e); + } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is + // invoked from here. + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new BufferedMutatorExample(), args); + } +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 931fba42b23..29bb2bbc4fa 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -18,7 +18,18 @@ package org.apache.hadoop.hbase.test; -import com.google.common.collect.Sets; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -40,6 +51,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -87,17 +100,7 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Sets; /** * This is an integration test borrowed from goraci, written by Keith Turner, @@ -340,7 +343,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { byte[] id; long count = 0; int i; - Table table; + BufferedMutator mutator; Connection connection; long numNodes; long wrap; @@ -363,14 +366,14 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } protected void instantiateHTable() throws IOException { - table = connection.getTable(getTableName(connection.getConfiguration())); - table.setAutoFlushTo(false); - table.setWriteBufferSize(4 * 1024 * 1024); + mutator = connection.getBufferedMutator( + new BufferedMutatorParams(getTableName(connection.getConfiguration())) + .writeBufferSize(4 * 1024 * 1024)); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException { - table.close(); + mutator.close(); connection.close(); } @@ -421,7 +424,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (id != null) { put.add(FAMILY_NAME, COLUMN_CLIENT, id); } - table.put(put); + mutator.mutate(put); if (i % 1000 == 0) { // Tickle progress every so often else maprunner will think us hung @@ -429,7 +432,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } - table.flushCommits(); + mutator.flush(); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java index 50c638a496f..e136ac8b48a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java @@ -38,13 +38,11 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -187,7 +185,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB protected void instantiateHTable() throws IOException { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { Table table = connection.getTable(getTableName(i)); - table.setAutoFlushTo(true); //table.setWriteBufferSize(4 * 1024 * 1024); this.tables[i] = table; } @@ -233,9 +230,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB output.progress(); } } - for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { - tables[j].flushCommits(); - } } } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index 6e10ba9e11f..c92393ff07b 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -42,12 +42,11 @@ import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -113,8 +112,6 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase { private static final int SCANNER_CACHING = 500; - protected IntegrationTestingUtility util; - private String toRun = null; private enum Counters { @@ -168,7 +165,7 @@ public void cleanUpCluster() throws Exception { { protected long recordsToWrite; protected Connection connection; - protected Table table; + protected BufferedMutator mutator; protected Configuration conf; protected int numBackReferencesPerRow; @@ -184,9 +181,9 @@ public void cleanUpCluster() throws Exception { String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); this.connection = ConnectionFactory.createConnection(conf); - table = connection.getTable(TableName.valueOf(tableName)); - table.setWriteBufferSize(4*1024*1024); - table.setAutoFlushTo(false); + mutator = connection.getBufferedMutator( + new BufferedMutatorParams(TableName.valueOf(tableName)) + .writeBufferSize(4 * 1024 * 1024)); String taskId = conf.get("mapreduce.task.attempt.id"); Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); @@ -201,8 +198,7 @@ public void cleanUpCluster() throws Exception { @Override public void cleanup(Context context) throws IOException { - table.flushCommits(); - table.close(); + mutator.close(); connection.close(); } @@ -235,7 +231,7 @@ public void cleanUpCluster() throws Exception { refsWritten.increment(1); } rowsWritten.increment(1); - table.put(p); + mutator.mutate(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -244,7 +240,7 @@ public void cleanUpCluster() throws Exception { } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } } @@ -320,7 +316,7 @@ public void cleanUpCluster() throws Exception { NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJobName(TEST_NAME + " Load for " + htd.getTableName()); job.setJarByClass(this.getClass()); setMapperClass(job); @@ -344,7 +340,7 @@ public void cleanUpCluster() throws Exception { protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "verify-output"); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(TEST_NAME + " Verification for " + htd.getTableName()); setJobScannerConf(job); @@ -398,7 +394,7 @@ public void cleanUpCluster() throws Exception { // Only disable and drop if we succeeded to verify - otherwise it's useful // to leave it around for post-mortem - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } public void usage() { @@ -454,15 +450,17 @@ public void cleanUpCluster() throws Exception { HTableDescriptor htd = new HTableDescriptor(table); htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); - Admin admin = new HBaseAdmin(getConf()); - if (doLoad) { - admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); - doLoad(getConf(), htd); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + if (doLoad) { + admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); + doLoad(getConf(), htd); + } } if (doVerify) { doVerify(getConf(), htd); if (doDelete) { - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } } return 0; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java index 96743c8bc64..05e214b17f9 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java @@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); p.setCellVisibility(new CellVisibility(exp)); getCounter(expIdx).increment(1); - table.put(p); + mutator.mutate(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index 2b0b1d60056..3fa8a9cba0d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -234,12 +234,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private LinkedBlockingQueue insertData() throws IOException, InterruptedException { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); - Table ht = util.getConnection().getTable(this.tableName); + BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; for (int x = 0; x < 5000; x++) { TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); try { - ht.setAutoFlushTo(false); for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); @@ -248,7 +247,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { random.nextBytes(value); p.add(familyName, Bytes.toBytes(random.nextLong()), value); } - ht.put(p); + ht.mutate(p); } if ((x % 1000) == 0) { admin.flush(tableName); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java index 583644200b7..dad5a32d3e3 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java @@ -227,7 +227,6 @@ public class RowResource extends ResourceBase { } table = servlet.getTable(tableResource.getName()); table.put(puts); - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -489,7 +488,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -580,7 +578,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulDeleteRequests(1); return response.build(); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index b02f069561e..e91f873bf17 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -49,15 +49,16 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; @@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private int presplitRegions = 0; private boolean useTags = false; private int noOfTags = 1; - private HConnection connection; + private Connection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool { value.getRows(), value.getTotalRows(), value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), - HConnectionManager.createConnection(context.getConfiguration()), status); + ConnectionFactory.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool { final int preSplitRegions = this.presplitRegions; final boolean useTags = this.useTags; final int numTags = this.noOfTags; - final HConnection connection = HConnectionManager.createConnection(getConf()); + final Connection connection = ConnectionFactory.createConnection(getConf()); for (int i = 0; i < this.N; i++) { final int index = i; Thread t = new Thread ("TestClient-" + i) { @@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool { private boolean writeToWAL = true; private boolean useTags = false; private int noOfTags = 0; - private HConnection connection; + private Connection connection; TestOptions() { } TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, - int noOfTags, HConnection connection) { + int noOfTags, Connection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; @@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return writeToWAL; } - public HConnection getConnection() { + public Connection getConnection() { return connection; } @@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final int totalRows; private final Status status; protected TableName tableName; - protected HTableInterface table; protected volatile Configuration conf; - protected boolean flushCommits; protected boolean writeToWAL; protected boolean useTags; protected int noOfTags; - protected HConnection connection; + protected Connection connection; /** * Note that all subclasses of this class must provide a public contructor @@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.totalRows = options.getTotalRows(); this.status = status; this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); this.useTags = options.isUseTags(); this.noOfTags = options.getNumTags(); @@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return period == 0? this.perClientRunRows: period; } - void testSetup() throws IOException { - this.table = connection.getTable(tableName); - this.table.setAutoFlushTo(false); - } - - void testTakedown() throws IOException { - if (flushCommits) { - this.table.flushCommits(); - } - table.close(); - } - + abstract void testTakedown() throws IOException; /* * Run test * @return Elapsed time. @@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool { return (System.nanoTime() - startTime) / 1000000; } + abstract void testSetup() throws IOException; + /** * Provides an extension point for tests that don't want a per row invocation. */ @@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } - @SuppressWarnings("unused") - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + public TableTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + void testSetup() throws IOException { + this.table = connection.getTable(tableName); + } + + @Override + void testTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + protected boolean flushCommits; + + public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + this.flushCommits = options.isFlushCommits(); + } + + void testSetup() throws IOException { + this.mutator = connection.getBufferedMutator(tableName); + } + + void testTakedown() throws IOException { + if (flushCommits) { + this.mutator.flush(); + } + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @SuppressWarnings("unused") - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Configuration conf, TestOptions options, Status status) { @@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Configuration conf, TestOptions options, Status status) { @@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool { long runOneClient(final Class cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, - HConnection connection, final Status status) + Connection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); @@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } - this.connection = HConnectionManager.createConnection(getConf()); + this.connection = ConnectionFactory.createConnection(getConf()); final String useTags = "--usetags="; if (cmd.startsWith(useTags)) { diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index eb1fc98ddd6..297162bfdf9 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.rest.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.io.IOException; import java.util.ArrayList; @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -99,9 +98,7 @@ public class TestRemoteTable { htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3)); htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3)); admin.createTable(htd); - Table table = null; - try { - table = TEST_UTIL.getConnection().getTable(TABLE); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) { Put put = new Put(ROW_1); put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1); table.put(put); @@ -110,9 +107,6 @@ public class TestRemoteTable { put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2); put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2); table.put(put); - table.flushCommits(); - } finally { - if (null != table) table.close(); } remoteTable = new RemoteHTable( new Client(new Cluster().add("localhost", @@ -349,7 +343,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_2, value2)); Delete delete = new Delete(ROW_3); - delete.deleteColumn(COLUMN_2, QUALIFIER_2); + delete.addColumn(COLUMN_2, QUALIFIER_2); remoteTable.delete(delete); get = new Get(ROW_3); @@ -464,7 +458,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_1, value1)); assertNull(value2); assertTrue(remoteTable.exists(get)); - assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length); + assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length); Delete delete = new Delete(ROW_1); remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 848bd56720a..c16b4c3be73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -60,8 +60,7 @@ import com.google.protobuf.ServiceException; @InterfaceStability.Stable public final class HTableWrapper implements HTableInterface { - private TableName tableName; - private final Table table; + private final HTableInterface table; private ClusterConnection connection; private final List openTables; @@ -78,7 +77,6 @@ public final class HTableWrapper implements HTableInterface { private HTableWrapper(List openTables, TableName tableName, ClusterConnection connection, ExecutorService pool) throws IOException { - this.tableName = tableName; this.table = connection.getTable(tableName, pool); this.connection = connection; this.openTables = openTables; @@ -244,7 +242,7 @@ public final class HTableWrapper implements HTableInterface { @Override public byte[] getTableName() { - return tableName.getName(); + return table.getTableName(); } @Override @@ -320,7 +318,7 @@ public final class HTableWrapper implements HTableInterface { @Override public void setAutoFlush(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); + table.setAutoFlush(autoFlush); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 563b1f8cf30..6e0d9e70e08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -52,22 +52,22 @@ public class TableOutputFormat extends FileOutputFormat { - private Table m_table; + private BufferedMutator m_mutator; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the * lifecycle of {@code conn}. */ - public TableRecordWriter(final Table table) throws IOException { - this.m_table = table; + public TableRecordWriter(final BufferedMutator mutator) throws IOException { + this.m_mutator = mutator; } public void close(Reporter reporter) throws IOException { - this.m_table.close(); + this.m_mutator.close(); } public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + m_mutator.mutate(new Put(value)); } } @@ -77,13 +77,12 @@ public class TableOutputFormat extends FileOutputFormat { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); Connection connection; - Map tables; + Map mutatorMap = new HashMap<>(); Configuration conf; boolean useWriteAheadLogging; @@ -91,7 +91,6 @@ public class MultiTableOutputFormat extends OutputFormat(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } @@ -99,28 +98,28 @@ public class MultiTableOutputFormat extends OutputFormat { private Connection connection; - private Table table; + private BufferedMutator mutator; /** * @throws IOException @@ -94,8 +94,7 @@ implements Configurable { public TableRecordWriter() throws IOException { String tableName = conf.get(OUTPUT_TABLE); this.connection = ConnectionFactory.createConnection(conf); - this.table = connection.getTable(TableName.valueOf(tableName)); - this.table.setAutoFlushTo(false); + this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); LOG.info("Created table instance for " + tableName); } /** @@ -103,12 +102,12 @@ implements Configurable { * * @param context The context. * @throws IOException When closing the writer fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see RecordWriter#close(TaskAttemptContext) */ @Override public void close(TaskAttemptContext context) throws IOException { - table.close(); + mutator.close(); connection.close(); } @@ -118,14 +117,15 @@ implements Configurable { * @param key The key. * @param value The value. * @throws IOException When writing fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) + * @see RecordWriter#write(Object, Object) */ @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) table.put(new Put((Put)value)); - else if (value instanceof Delete) table.delete(new Delete((Delete)value)); - else throw new IOException("Pass a Delete or a Put"); + if (!(value instanceof Put) && !(value instanceof Delete)) { + throw new IOException("Pass a Delete or a Put"); + } + mutator.mutate(value); } } @@ -136,11 +136,9 @@ implements Configurable { * @return The newly created writer instance. * @throws IOException When creating the writer fails. * @throws InterruptedException When the jobs is cancelled. - * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public RecordWriter getRecordWriter( - TaskAttemptContext context) + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new TableRecordWriter(); } @@ -151,7 +149,7 @@ implements Configurable { * @param context The current context. * @throws IOException When the check fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) + * @see OutputFormat#checkOutputSpecs(JobContext) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, @@ -167,7 +165,7 @@ implements Configurable { * @return The committer. * @throws IOException When creating the committer fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see OutputFormat#getOutputCommitter(TaskAttemptContext) */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index aadc2f99853..2e7afa52cc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -49,13 +49,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -392,6 +391,7 @@ public class PerformanceEvaluation extends Configured implements Tool { throws IOException, InterruptedException { final Class cmd = determineCommandClass(opts.cmdName); assert cmd != null; + @SuppressWarnings("unchecked") Future[] threads = new Future[opts.numClientThreads]; RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -457,7 +457,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation - " + opts.cmdName); @@ -940,7 +940,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Sampler traceSampler; private final SpanReceiverHost receiverHost; protected Connection connection; - protected Table table; +// protected Table table; private String testName; private Histogram latency; @@ -1022,25 +1022,25 @@ public class PerformanceEvaluation extends Configured implements Tool { if (!opts.oneCon) { this.connection = ConnectionFactory.createConnection(conf); } - this.table = connection.getTable(TableName.valueOf(opts.tableName)); - this.table.setAutoFlushTo(opts.autoFlush); + onStartup(); latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); } + abstract void onStartup() throws IOException; + void testTakedown() throws IOException { reportLatency(); reportValueSize(); - if (opts.flushCommits) { - this.table.flushCommits(); - } - table.close(); + onTakedown(); if (!opts.oneCon) { connection.close(); } receiverHost.closeReceivers(); } + abstract void onTakedown() throws IOException; + /* * Run test * @return Elapsed time. @@ -1136,7 +1136,43 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException, InterruptedException; } - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + TableTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.table = connection.getTable(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + + BufferedMutatorTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1166,7 +1202,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1254,7 +1290,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { private final Consistency consistency; private ArrayList gets; private Random rd = new Random(); @@ -1308,7 +1344,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1334,11 +1370,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Connection con, TestOptions options, Status status) { @@ -1371,7 +1407,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1387,7 +1423,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1413,11 +1449,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Connection con, TestOptions options, Status status) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java index 711d5925567..278973efa75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java @@ -24,19 +24,17 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestCase.FlushCache; import org.apache.hadoop.hbase.HBaseTestCase.HTableIncommon; import org.apache.hadoop.hbase.HBaseTestCase.Incommon; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -220,14 +217,16 @@ public class TestMultiVersions { } } // Insert data + List puts = new ArrayList<>(); for (int i = 0; i < startKeys.length; i++) { for (int j = 0; j < timestamp.length; j++) { Put put = new Put(rows[i], timestamp[j]); put.add(HConstants.CATALOG_FAMILY, null, timestamp[j], Bytes.toBytes(timestamp[j])); - table.put(put); + puts.add(put); } } + table.put(puts); // There are 5 cases we have to test. Each is described below. for (int i = 0; i < rows.length; i++) { for (int j = 0; j < timestamp.length; j++) { @@ -241,7 +240,6 @@ public class TestMultiVersions { } assertTrue(cellCount == 1); } - table.flushCommits(); } // Case 1: scan with LATEST_TIMESTAMP. Should get two rows diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 31f390797bf..82f62e44cfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -135,7 +135,7 @@ public class TestClientPushback { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - table.ap.submit(tablename, ops, true, new Batch.Callback() { + table.mutator.ap.submit(tablename, ops, true, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, Result result) { endTime.set(EnvironmentEdgeManager.currentTime()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java index e017bccd33d..2cb2cfc1529 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java @@ -100,31 +100,30 @@ public class TestCloneSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - Table table = TEST_UTIL.getConnection().getTable(tableName); - try { - // enable table and insert data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot0Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); - - // take a snapshot - admin.snapshot(snapshotName0, tableName); - - // enable table and insert more data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot1Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); - - // take a snapshot of the updated table - admin.snapshot(snapshotName1, tableName); - - // re-enable table - admin.enableTable(tableName); - } finally { - table.close(); } + admin.disableTable(tableName); + + // take a snapshot + admin.snapshot(snapshotName0, tableName); + + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ + snapshot1Rows = TEST_UTIL.countRows(table); + } + admin.disableTable(tableName); + + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); + + // re-enable table + admin.enableTable(tableName); } protected int getNumReplicas() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 0ebafaf432b..c77ab2978a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -689,15 +689,15 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get("hbase.client.keyvalue.maxsize"); + String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); put.add(FAMILY, QUALIFIER, value); ht.put(put); try { - TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 2 * 1024 * 1024); - TABLE = Bytes.toBytes("testMaxKeyValueSize2"); + TEST_UTIL.getConfiguration().setInt( + TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -709,7 +709,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set("hbase.client.keyvalue.maxsize", oldMaxSize); + conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test @@ -3903,7 +3903,7 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"), new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY}); - table.setAutoFlushTo(false); + table.setAutoFlush(false); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { byte[] row = Bytes.toBytes("row" + i); @@ -3934,6 +3934,7 @@ public class TestFromClientSide { Result row : scanner) nbRows++; assertEquals(NB_BATCH_ROWS * 10, nbRows); + table.close(); } @Test @@ -3944,7 +3945,6 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"), new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY }); - table.setAutoFlushTo(false); table.setWriteBufferSize(10); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { @@ -3956,8 +3956,6 @@ public class TestFromClientSide { } table.put(rowsUpdate); - table.flushCommits(); - Scan scan = new Scan(); scan.addFamily(CONTENTS_FAMILY); ResultScanner scanner = table.getScanner(scan); @@ -4146,6 +4144,7 @@ public class TestFromClientSide { HBaseAdmin ha = new HBaseAdmin(t.getConnection()); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); + ha.close(); } /** @@ -4159,9 +4158,10 @@ public class TestFromClientSide { final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect"); HTable t = createUnmangedHConnectionHTable(tableName); Connection conn = t.getConnection(); - HBaseAdmin ha = new HBaseAdmin(conn); - assertTrue(ha.tableExists(tableName)); - assertTrue(t.get(new Get(ROW)).isEmpty()); + try (HBaseAdmin ha = new HBaseAdmin(conn)) { + assertTrue(ha.tableExists(tableName)); + assertTrue(t.get(new Get(ROW)).isEmpty()); + } // stop the master MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); @@ -4174,9 +4174,10 @@ public class TestFromClientSide { // test that the same unmanaged connection works with a new // HBaseAdmin and can connect to the new master; - HBaseAdmin newAdmin = new HBaseAdmin(conn); - assertTrue(newAdmin.tableExists(tableName)); - assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1); + try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) { + assertTrue(newAdmin.tableExists(tableName)); + assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1); + } } @Test @@ -4273,7 +4274,6 @@ public class TestFromClientSide { new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024); // set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow // in Store.rowAtOrBeforeFromStoreFile - table.setAutoFlushTo(true); String regionName = table.getRegionLocations().firstKey().getEncodedName(); HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName); @@ -4348,6 +4348,8 @@ public class TestFromClientSide { assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null)); assertTrue(Bytes.equals(result.getRow(), forthRow)); assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four)); + + table.close(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index bab78ab2052..abea6997e3d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -149,7 +149,7 @@ public class TestMultiParallel { ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); try { try (Table t = connection.getTable(TEST_TABLE, executor)) { - List puts = constructPutRequests(); // creates a Put for every region + List puts = constructPutRequests(); // creates a Put for every region t.batch(puts); HashSet regionservers = new HashSet(); try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { @@ -172,7 +172,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // load test data - List puts = constructPutRequests(); + List puts = constructPutRequests(); table.batch(puts); // create a list of gets and run it @@ -262,16 +262,12 @@ public class TestMultiParallel { // Load the data LOG.info("get new table"); Table table = UTIL.getConnection().getTable(TEST_TABLE); - table.setAutoFlushTo(false); table.setWriteBufferSize(10 * 1024 * 1024); LOG.info("constructPutRequests"); - List puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } + List puts = constructPutRequests(); + table.put(puts); LOG.info("puts"); - table.flushCommits(); final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() .size(); assert liveRScount > 0; @@ -290,11 +286,7 @@ public class TestMultiParallel { // try putting more keys after the abort. same key/qual... just validating // no exceptions thrown puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - - table.flushCommits(); + table.put(puts); } LOG.info("validating loaded data"); @@ -332,7 +324,7 @@ public class TestMultiParallel { LOG.info("test=testBatchWithPut"); Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -364,7 +356,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -372,7 +364,7 @@ public class TestMultiParallel { List deletes = new ArrayList(); for (int i = 0; i < KEYS.length; i++) { Delete delete = new Delete(KEYS[i]); - delete.deleteFamily(BYTES_FAMILY); + delete.addFamily(BYTES_FAMILY); deletes.add(delete); } results = table.batch(deletes); @@ -393,7 +385,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -665,8 +657,8 @@ public class TestMultiParallel { } } - private List constructPutRequests() { - List puts = new ArrayList(); + private List constructPutRequests() { + List puts = new ArrayList<>(); for (byte[] k : KEYS) { Put put = new Put(k); put.add(BYTES_FAMILY, QUALIFIER, VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java index a5dce028bca..c5e6449945c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java @@ -111,11 +111,12 @@ public class TestRestoreSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - Table table = TEST_UTIL.getConnection().getTable(tableName); // enable table and insert data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot0Rows = TEST_UTIL.countRows(table); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot0Rows = TEST_UTIL.countRows(table); + } admin.disableTable(tableName); // take a snapshot @@ -123,9 +124,10 @@ public class TestRestoreSnapshotFromClient { // enable table and insert more data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot1Rows = TEST_UTIL.countRows(table); - table.close(); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot1Rows = TEST_UTIL.countRows(table); + } } @After @@ -184,7 +186,7 @@ public class TestRestoreSnapshotFromClient { assertEquals(2, table.getTableDescriptor().getFamilies().size()); HTableDescriptor htd = admin.getTableDescriptor(tableName); assertEquals(2, htd.getFamilies().size()); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); long snapshot2Rows = snapshot1Rows + 500; assertEquals(snapshot2Rows, TEST_UTIL.countRows(table)); assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 018bdc4044d..dcf26f2a1a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -133,17 +133,16 @@ public class TestRpcControllerFactory { Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(name); - table.setAutoFlushTo(false); byte[] row = Bytes.toBytes("row"); Put p = new Put(row); p.add(fam1, fam1, Bytes.toBytes("val0")); table.put(p); - table.flushCommits(); + Integer counter = 1; counter = verifyCount(counter); Delete d = new Delete(row); - d.deleteColumn(fam1, fam1); + d.addColumn(fam1, fam1); table.delete(d); counter = verifyCount(counter); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java index 4649961b176..317707a6861 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java @@ -176,11 +176,11 @@ public class TestHTableWrapper { private void checkAutoFlush() { boolean initialAutoFlush = hTableInterface.isAutoFlush(); - hTableInterface.setAutoFlushTo(false); + hTableInterface.setAutoFlush(false); assertFalse(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(true); + hTableInterface.setAutoFlush(true); assertTrue(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(initialAutoFlush); + hTableInterface.setAutoFlush(initialAutoFlush); } private void checkBufferSize() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index f56811e9841..f2376318fbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -937,7 +937,6 @@ public class TestDistributedLogSplitting { if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } - ht.setAutoFlushTo(true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); @@ -1629,11 +1628,11 @@ public class TestDistributedLogSplitting { /** * Load table with puts and deletes with expected values so that we can verify later */ - private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { - t.setAutoFlushTo(false); + private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { byte[] k = new byte[3]; // add puts + List puts = new ArrayList<>(); for (byte b1 = 'a'; b1 <= 'z'; b1++) { for (byte b2 = 'a'; b2 <= 'z'; b2++) { for (byte b3 = 'a'; b3 <= 'z'; b3++) { @@ -1642,11 +1641,11 @@ public class TestDistributedLogSplitting { k[2] = b3; Put put = new Put(k); put.add(f, column, k); - t.put(put); + puts.add(put); } } } - t.flushCommits(); + t.put(puts); // add deletes for (byte b3 = 'a'; b3 <= 'z'; b3++) { k[0] = 'a'; @@ -1655,7 +1654,6 @@ public class TestDistributedLogSplitting { Delete del = new Delete(k); t.delete(del); } - t.flushCommits(); } private void waitForCounter(AtomicLong ctr, long oldval, long newval, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index d2f1eab40a3..80287569467 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -83,11 +83,11 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME); - assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, - TableState.State.ENABLED)); - TEST_UTIL.loadTable(ht, FAMILYNAME, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) { + assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, + TableState.State.ENABLED)); + TEST_UTIL.loadTable(ht, FAMILYNAME, false); + } List> tableRegions = MetaTableAccessor.getTableRegionsAndLocations( m.getConnection(), TABLENAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 4a6ba4141c7..96b43420487 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -99,9 +99,9 @@ public class TestEndToEndSplitTransaction { TableName tableName = TableName.valueOf("TestSplit"); byte[] familyName = Bytes.toBytes("fam"); - HTable ht = TEST_UTIL.createTable(tableName, familyName); - TEST_UTIL.loadTable(ht, familyName, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) { + TEST_UTIL.loadTable(ht, familyName, false); + } HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); byte []firstRow = Bytes.toBytes("aaa"); byte []splitRow = Bytes.toBytes("lll"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index c9608c9d2ee..b1c3692d6d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -202,23 +202,22 @@ public class TestFSErrorsExposed { util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Make a new Configuration so it makes a new connection that has the // above configuration on it; else we use the old one w/ 10 as default. - Table table = util.getConnection().getTable(tableName); - - // Load some data - util.loadTable(table, fam, false); - table.flushCommits(); - util.flush(); - util.countRows(table); - - // Kill the DFS cluster - util.getDFSCluster().shutdownDataNodes(); - - try { + try (Table table = util.getConnection().getTable(tableName)) { + // Load some data + util.loadTable(table, fam, false); + util.flush(); util.countRows(table); - fail("Did not fail to count after removing data"); - } catch (Exception e) { - LOG.info("Got expected error", e); - assertTrue(e.getMessage().contains("Could not seek")); + + // Kill the DFS cluster + util.getDFSCluster().shutdownDataNodes(); + + try { + util.countRows(table); + fail("Did not fail to count after removing data"); + } catch (Exception e) { + LOG.info("Got expected error", e); + assertTrue(e.getMessage().contains("Could not seek")); + } } // Restart data nodes so that HBase can shut down cleanly. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java index 8e0bd219c33..8e7fe0488cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -78,6 +78,7 @@ public class TestRegionFavoredNodes { @AfterClass public static void tearDownAfterClass() throws Exception { + table.close(); if (createWithFavoredNode == null) { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index b95b20a21c1..aa071ef6ed4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -31,6 +31,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; + import static org.junit.Assert.*; import java.io.IOException; @@ -109,10 +110,11 @@ public class TestRegionServerMetrics { TEST_UTIL.createTable(tName, cfName); - TEST_UTIL.getConnection().getTable(tName).close(); //wait for the table to come up. + Connection connection = TEST_UTIL.getConnection(); + connection.getTable(tName).close(); //wait for the table to come up. // Do a first put to be sure that the connection is established, meta is there and so on. - HTable table = (HTable) TEST_UTIL.getConnection().getTable(tName); + Table table = connection.getTable(tName); Put p = new Put(row); p.add(cfName, qualifier, initValue); table.put(p); @@ -141,19 +143,21 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - for ( HRegionInfo i:table.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); - metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + try (RegionLocator locator = connection.getRegionLocator(tName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); + metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + } } - List gets = new ArrayList(); for (int i=0; i< 10; i++) { gets.add(new Get(row)); @@ -165,11 +169,11 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - table.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int i=0; i< 30; i++) { - table.put(p); + puts.add(p); } - table.flushCommits(); + table.put(puts); metricsRegionServer.getRegionServerWrapper().forceRecompute(); metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource); @@ -325,35 +329,39 @@ public class TestRegionServerMetrics { byte[] val = Bytes.toBytes("One"); - HTable t = TEST_UTIL.createTable(tableName, cf); - t.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int insertCount =0; insertCount < 100; insertCount++) { Put p = new Put(Bytes.toBytes("" + insertCount + "row")); p.add(cf, qualifier, val); - t.put(p); + puts.add(p); } - t.flushCommits(); + try (HTable t = TEST_UTIL.createTable(tableName, cf)) { + t.put(puts); - Scan s = new Scan(); - s.setBatch(1); - s.setCaching(1); - ResultScanner resultScanners = t.getScanner(s); + Scan s = new Scan(); + s.setBatch(1); + s.setCaching(1); + ResultScanner resultScanners = t.getScanner(s); - for (int nextCount = 0; nextCount < 30; nextCount++) { - Result result = resultScanners.next(); - assertNotNull(result); - assertEquals(1, result.size()); + for (int nextCount = 0; nextCount < 30; nextCount++) { + Result result = resultScanners.next(); + assertNotNull(result); + assertEquals(1, result.size()); + } } - for ( HRegionInfo i:t.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index bd5439af83b..86515a6230c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -91,7 +91,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); @@ -172,19 +171,16 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); Put put1 = new Put(Bytes.toBytes("row2")); put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put1); - table.flushCommits(); admin.flush(tableName); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); admin.compact(tableName); @@ -221,7 +217,6 @@ public class TestScannerWithBulkload { put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version0"))); table.put(put1); - table.flushCommits(); bulkload.doBulkLoad(hfilePath, (HTable) table); latch.countDown(); } catch (TableNotFoundException e) { @@ -263,7 +258,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 1da3662def5..111acf3a27f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -312,7 +312,7 @@ public class TestLogRolling { admin.createTable(desc); Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - assertTrue(table.isAutoFlush()); + assertTrue(((HTable) table).isAutoFlush()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); final FSHLog log = (FSHLog) server.getWAL(null); @@ -456,8 +456,6 @@ public class TestLogRolling { writeData(table, 1002); - table.setAutoFlushTo(true); - long curTime = System.currentTimeMillis(); LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index 7ca9fed3809..4bb18420dc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -54,7 +53,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(false); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : @@ -119,7 +117,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas Put put = new Put(row); put.add(famName, row, row); - htable1 = utility1.getConnection().getTable(tableName); + if (htable1 == null) { + htable1 = utility1.getConnection().getTable(tableName); + } + htable1.put(put); Get get = new Get(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index bfb01dbf6e7..f0db865bad2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -70,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index adf3c0eb366..f1e956ceffa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -151,7 +151,6 @@ public class TestReplicationWithTags { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } htable1 = utility1.getConnection().getTable(TABLE_NAME); - htable1.setWriteBufferSize(1024); htable2 = utility2.getConnection().getTable(TABLE_NAME); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 9ea64d107b6..c087f4e6522 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -399,25 +399,18 @@ public class TestVisibilityLabelsReplication { } static Table writeData(TableName tableName, String... labelExps) throws Exception { - Table table = null; - try { - table = TEST_UTIL.getConnection().getTable(TABLE_NAME); - int i = 1; - List puts = new ArrayList(); - for (String labelExp : labelExps) { - Put put = new Put(Bytes.toBytes("row" + i)); - put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); - put.setCellVisibility(new CellVisibility(labelExp)); - put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); - puts.add(put); - i++; - } - table.put(puts); - } finally { - if (table != null) { - table.flushCommits(); - } + Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); + int i = 1; + List puts = new ArrayList(); + for (String labelExp : labelExps) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); + put.setCellVisibility(new CellVisibility(labelExp)); + put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); + puts.add(put); + i++; } + table.put(puts); return table; } // A simple BaseRegionbserver impl that allows to add a non-visibility tag from the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java index 74f358d255a..a8588ccc5d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.HashSet; - import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -673,20 +673,22 @@ public class SnapshotTestingUtils { public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows, byte[]... families) throws IOException, InterruptedException { - loadData(util, util.getConnection().getTable(tableName), rows, families); + BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName); + loadData(util, mutator, rows, families); } - public static void loadData(final HBaseTestingUtility util, final Table table, int rows, + public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows, byte[]... families) throws IOException, InterruptedException { - table.setAutoFlushTo(false); - // Ensure one row per region assertTrue(rows >= KEYS.length); for (byte k0: KEYS) { byte[] k = new byte[] { k0 }; byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); rows--; } @@ -694,22 +696,24 @@ public class SnapshotTestingUtils { while (rows-- > 0) { byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); } - table.flushCommits(); + mutator.flush(); - waitForTableToBeOnline(util, table.getName()); + waitForTableToBeOnline(util, mutator.getName()); } - private static void putData(final Table table, final byte[][] families, - final byte[] key, final byte[] value) throws IOException { + private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { byte[] q = Bytes.toBytes("q"); Put put = new Put(key); put.setDurability(Durability.SKIP_WAL); for (byte[] family: families) { put.add(family, q, value); } - table.put(put); + return put; } public static void deleteAllSnapshots(final Admin admin) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index 38938714bfc..b45d676249c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,11 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -54,7 +49,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -76,7 +70,6 @@ public class TestFlushSnapshotFromClient { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final int NUM_RS = 2; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); - private static final byte[] TEST_QUAL = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("test"); private final int DEFAULT_NUM_ROWS = 100; @@ -145,8 +138,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - Table table = UTIL.getConnection().getTable(TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -228,8 +220,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - Table table = UTIL.getConnection().getTable(TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java index 4b36c114125..ae1ca136b80 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -103,8 +102,8 @@ public class TestRestoreFlushSnapshotFromClient { // create Table and disable it SnapshotTestingUtils.createTable(UTIL, tableName, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); Table table = UTIL.getConnection().getTable(tableName); - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); snapshot0Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 500 rows"); logFSTree(); @@ -117,7 +116,7 @@ public class TestRestoreFlushSnapshotFromClient { logFSTree(); // insert more data - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); snapshot1Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 1000 rows"); logFSTree();