HBASE-18347 Implement a BufferedMutator for async client

This commit is contained in:
zhangduo 2017-08-21 18:37:26 +08:00
parent 45b20da23b
commit 1ae9a39011
8 changed files with 585 additions and 0 deletions

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Used to communicate with a single HBase table in batches. Obtain an instance from a
* {@link AsyncConnection} and call {@link #close()} afterwards.
* <p>
* The implementation is required to be thread safe.
*/
@InterfaceAudience.Public
public interface AsyncBufferedMutator extends Closeable {
/**
* Gets the fully qualified table name instance of the table that this
* {@code AsyncBufferedMutator} writes to.
*/
TableName getName();
/**
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
* <p>
* The reference returned is not a copy, so any change made to it will affect this instance.
*/
Configuration getConfiguration();
/**
* Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the wire as
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
* @param mutation The data to send.
*/
CompletableFuture<Void> mutate(Mutation mutation);
/**
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
* as part of a batch. There is no guarantee of sending entire content of {@code mutations} in a
* single batch, the implementations are free to break it up according to the write buffer
* capacity.
* @param mutations The data to send.
*/
List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations);
/**
* Executes all the buffered, asynchronous operations.
*/
void flush();
/**
* Performs a {@link #flush()} and releases any resources held.
*/
@Override
void close();
/**
* Returns the maximum size in bytes of the write buffer.
* <p>
* The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
* @return The size of the write buffer in bytes.
*/
long getWriteBufferSize();
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* For creating {@link AsyncBufferedMutator}.
*/
@InterfaceAudience.Public
public interface AsyncBufferedMutatorBuilder {
/**
* Set timeout for the background flush operation.
*/
AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each rpc request when doing background flush.
*/
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
*/
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @see #setMaxAttempts(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
return setMaxAttempts(retries2Attempts(maxRetries));
}
/**
* Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation
* timeout and max attempt times(or max retry times) are both limitations for retrying, we will
* stop retrying when we reach any of the limitations.
* @see #setMaxRetries(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts);
/**
* Set the number of retries that are allowed before we start to log.
*/
AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
/**
* Override the write buffer size specified by the provided {@link AsyncConnection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
* {@code hbase.client.write.buffer}.
*/
AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
/**
* Create the {@link AsyncBufferedMutator} instance.
*/
AsyncBufferedMutator build();
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* The implementation of {@link AsyncBufferedMutatorBuilder}.
*/
@InterfaceAudience.Private
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
private final AsyncTableBuilder<? extends AsyncTableBase> tableBuilder;
private long writeBufferSize;
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<? extends AsyncTableBase> tableBuilder) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
}
@Override
public AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit) {
tableBuilder.setOperationTimeout(timeout, unit);
return this;
}
@Override
public AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit) {
tableBuilder.setRpcTimeout(timeout, unit);
return this;
}
@Override
public AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit) {
tableBuilder.setRetryPause(pause, unit);
return this;
}
@Override
public AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts) {
tableBuilder.setMaxAttempts(maxAttempts);
return this;
}
@Override
public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
tableBuilder.setStartLogErrorsCnt(startLogErrorsCnt);
return this;
}
@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be >= 0",
writeBufferSize);
this.writeBufferSize = writeBufferSize;
return this;
}
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTableBase}.
*/
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private final AsyncTableBase table;
private final long writeBufferSize;
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
private long bufferedSize;
private boolean closed;
AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) {
this.table = table;
this.writeBufferSize = writeBufferSize;
}
@Override
public TableName getName() {
return table.getName();
}
@Override
public Configuration getConfiguration() {
return table.getConfiguration();
}
private void internalFlush() {
List<Mutation> toSend = this.mutations;
if (toSend.isEmpty()) {
return;
}
List<CompletableFuture<Void>> toComplete = this.futures;
assert toSend.size() == toComplete.size();
this.mutations = new ArrayList<>();
this.futures = new ArrayList<>();
bufferedSize = 0L;
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) {
future.whenComplete((r, e) -> {
CompletableFuture<Void> f = toCompleteIter.next();
if (e != null) {
f.completeExceptionally(e);
} else {
f.complete(null);
}
});
}
}
@Override
public CompletableFuture<Void> mutate(Mutation mutation) {
CompletableFuture<Void> future = new CompletableFuture<Void>();
long heapSize = mutation.heapSize();
synchronized (this) {
if (closed) {
future.completeExceptionally(new IOException("Already closed"));
return future;
}
mutations.add(mutation);
futures.add(future);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
internalFlush();
}
}
return future;
}
@Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
List<CompletableFuture<Void>> futures =
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
synchronized (this) {
if (closed) {
IOException ioe = new IOException("Already closed");
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
this.mutations.addAll(mutations);
this.futures.addAll(futures);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
internalFlush();
}
}
return futures;
}
@Override
public synchronized void flush() {
internalFlush();
}
@Override
public synchronized void close() {
internalFlush();
closed = true;
}
@Override
public long getWriteBufferSize() {
return writeBufferSize;
}
}

View File

@ -136,4 +136,43 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
*/
AsyncAdminBuilder getAdminBuilder(ExecutorService pool);
/**
* Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes.
* <p>
* The returned instance will use default configs. Use
* {@link #getBufferedMutatorBuilder(TableName)} if you want to customize some configs.
* @param tableName the name of the table
* @return an {@link AsyncBufferedMutator} for the supplied tableName.
*/
default AsyncBufferedMutator getBufferedMutator(TableName tableName) {
return getBufferedMutatorBuilder(tableName).build();
}
/**
* Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}.
* @param tableName the name of the table
*/
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName);
/**
* Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes.
* <p>
* The returned instance will use default configs. Use
* {@link #getBufferedMutatorBuilder(TableName, ExecutorService)} if you want to customize some
* configs.
* @param tableName the name of the table
* @param pool the thread pool to use for executing callback
* @return an {@link AsyncBufferedMutator} for the supplied tableName.
*/
default AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
return getBufferedMutatorBuilder(tableName, pool).build();
}
/**
* Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}.
* @param tableName the name of the table
* @param pool the thread pool to use for executing callback
*/
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
}

View File

@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
import java.util.concurrent.TimeUnit;
@ -87,6 +89,8 @@ class AsyncConnectionConfiguration {
private final long scannerMaxResultSize;
private final long writeBufferSize;
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@ -112,6 +116,7 @@ class AsyncConnectionConfiguration {
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
}
long getMetaOperationTimeoutNs() {
@ -161,4 +166,8 @@ class AsyncConnectionConfiguration {
long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
long getWriteBufferSize() {
return writeBufferSize;
}
}

View File

@ -297,4 +297,15 @@ class AsyncConnectionImpl implements AsyncConnection {
}
};
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getRawTableBuilder(tableName));
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBufferMutator {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static int COUNT = 100;
private static byte[] VALUE = new byte[1024];
private static AsyncConnection CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, CF);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ThreadLocalRandom.current().nextBytes(VALUE);
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void test() throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList()));
// exceeded the write buffer size, a flush will be called directly
fs.forEach(f -> f.join());
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
});
// the first future should have been sent out.
futures.get(0).join();
Thread.sleep(2000);
// the last one should still be in write buffer
assertFalse(futures.get(futures.size() - 1).isDone());
}
// mutator.close will call mutator.flush automatically so all tasks should have been done.
futures.forEach(f -> f.join());
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
.forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ));
});
}
@Test
public void testClosedMutate() throws InterruptedException {
AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
mutator.close();
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try {
mutator.mutate(put).get();
fail("Close check failed");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class));
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
}
for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
try {
f.get();
fail("Close check failed");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class));
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
}
}
}
}