HBASE-18347 Implement a BufferedMutator for async client
This commit is contained in:
parent
25ee5f7f84
commit
d12eb7a4aa
|
@ -0,0 +1,84 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to communicate with a single HBase table in batches. Obtain an instance from a
|
||||||
|
* {@link AsyncConnection} and call {@link #close()} afterwards.
|
||||||
|
* <p>
|
||||||
|
* The implementation is required to be thread safe.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public interface AsyncBufferedMutator extends Closeable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the fully qualified table name instance of the table that this
|
||||||
|
* {@code AsyncBufferedMutator} writes to.
|
||||||
|
*/
|
||||||
|
TableName getName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
|
||||||
|
* <p>
|
||||||
|
* The reference returned is not a copy, so any change made to it will affect this instance.
|
||||||
|
*/
|
||||||
|
Configuration getConfiguration();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the wire as
|
||||||
|
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
|
||||||
|
* @param mutation The data to send.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> mutate(Mutation mutation);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
|
||||||
|
* as part of a batch. There is no guarantee of sending entire content of {@code mutations} in a
|
||||||
|
* single batch, the implementations are free to break it up according to the write buffer
|
||||||
|
* capacity.
|
||||||
|
* @param mutations The data to send.
|
||||||
|
*/
|
||||||
|
List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes all the buffered, asynchronous operations.
|
||||||
|
*/
|
||||||
|
void flush();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs a {@link #flush()} and releases any resources held.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void close();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum size in bytes of the write buffer.
|
||||||
|
* <p>
|
||||||
|
* The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
|
||||||
|
* @return The size of the write buffer in bytes.
|
||||||
|
*/
|
||||||
|
long getWriteBufferSize();
|
||||||
|
}
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For creating {@link AsyncBufferedMutator}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public interface AsyncBufferedMutatorBuilder {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout for the background flush operation.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout for each rpc request when doing background flush.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
|
||||||
|
* retrying.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
|
||||||
|
* <p>
|
||||||
|
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
|
||||||
|
* we will stop retrying when we reach any of the limitations.
|
||||||
|
* @see #setMaxAttempts(int)
|
||||||
|
* @see #setOperationTimeout(long, TimeUnit)
|
||||||
|
*/
|
||||||
|
default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
|
||||||
|
return setMaxAttempts(retries2Attempts(maxRetries));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation
|
||||||
|
* timeout and max attempt times(or max retry times) are both limitations for retrying, we will
|
||||||
|
* stop retrying when we reach any of the limitations.
|
||||||
|
* @see #setMaxRetries(int)
|
||||||
|
* @see #setOperationTimeout(long, TimeUnit)
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the number of retries that are allowed before we start to log.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override the write buffer size specified by the provided {@link AsyncConnection}'s
|
||||||
|
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
|
||||||
|
* {@code hbase.client.write.buffer}.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the {@link AsyncBufferedMutator} instance.
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutator build();
|
||||||
|
}
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The implementation of {@link AsyncBufferedMutatorBuilder}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
|
||||||
|
|
||||||
|
private final AsyncTableBuilder<? extends AsyncTableBase> tableBuilder;
|
||||||
|
|
||||||
|
private long writeBufferSize;
|
||||||
|
|
||||||
|
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
|
||||||
|
AsyncTableBuilder<? extends AsyncTableBase> tableBuilder) {
|
||||||
|
this.tableBuilder = tableBuilder;
|
||||||
|
this.writeBufferSize = connConf.getWriteBufferSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit) {
|
||||||
|
tableBuilder.setOperationTimeout(timeout, unit);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit) {
|
||||||
|
tableBuilder.setRpcTimeout(timeout, unit);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit) {
|
||||||
|
tableBuilder.setRetryPause(pause, unit);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts) {
|
||||||
|
tableBuilder.setMaxAttempts(maxAttempts);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
|
||||||
|
tableBuilder.setStartLogErrorsCnt(startLogErrorsCnt);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
|
||||||
|
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be >= 0",
|
||||||
|
writeBufferSize);
|
||||||
|
this.writeBufferSize = writeBufferSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutator build() {
|
||||||
|
return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,144 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTableBase}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
||||||
|
|
||||||
|
private final AsyncTableBase table;
|
||||||
|
|
||||||
|
private final long writeBufferSize;
|
||||||
|
|
||||||
|
private List<Mutation> mutations = new ArrayList<>();
|
||||||
|
|
||||||
|
private List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
|
|
||||||
|
private long bufferedSize;
|
||||||
|
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
|
AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) {
|
||||||
|
this.table = table;
|
||||||
|
this.writeBufferSize = writeBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableName getName() {
|
||||||
|
return table.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return table.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalFlush() {
|
||||||
|
List<Mutation> toSend = this.mutations;
|
||||||
|
if (toSend.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<CompletableFuture<Void>> toComplete = this.futures;
|
||||||
|
assert toSend.size() == toComplete.size();
|
||||||
|
this.mutations = new ArrayList<>();
|
||||||
|
this.futures = new ArrayList<>();
|
||||||
|
bufferedSize = 0L;
|
||||||
|
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
|
||||||
|
for (CompletableFuture<?> future : table.batch(toSend)) {
|
||||||
|
future.whenComplete((r, e) -> {
|
||||||
|
CompletableFuture<Void> f = toCompleteIter.next();
|
||||||
|
if (e != null) {
|
||||||
|
f.completeExceptionally(e);
|
||||||
|
} else {
|
||||||
|
f.complete(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> mutate(Mutation mutation) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||||
|
long heapSize = mutation.heapSize();
|
||||||
|
synchronized (this) {
|
||||||
|
if (closed) {
|
||||||
|
future.completeExceptionally(new IOException("Already closed"));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
mutations.add(mutation);
|
||||||
|
futures.add(future);
|
||||||
|
bufferedSize += heapSize;
|
||||||
|
if (bufferedSize >= writeBufferSize) {
|
||||||
|
internalFlush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
|
||||||
|
List<CompletableFuture<Void>> futures =
|
||||||
|
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
|
||||||
|
synchronized (this) {
|
||||||
|
if (closed) {
|
||||||
|
IOException ioe = new IOException("Already closed");
|
||||||
|
futures.forEach(f -> f.completeExceptionally(ioe));
|
||||||
|
return futures;
|
||||||
|
}
|
||||||
|
this.mutations.addAll(mutations);
|
||||||
|
this.futures.addAll(futures);
|
||||||
|
bufferedSize += heapSize;
|
||||||
|
if (bufferedSize >= writeBufferSize) {
|
||||||
|
internalFlush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return futures;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void flush() {
|
||||||
|
internalFlush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() {
|
||||||
|
internalFlush();
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWriteBufferSize() {
|
||||||
|
return writeBufferSize;
|
||||||
|
}
|
||||||
|
}
|
|
@ -136,4 +136,43 @@ public interface AsyncConnection extends Closeable {
|
||||||
* @param pool the thread pool to use for executing callback
|
* @param pool the thread pool to use for executing callback
|
||||||
*/
|
*/
|
||||||
AsyncAdminBuilder getAdminBuilder(ExecutorService pool);
|
AsyncAdminBuilder getAdminBuilder(ExecutorService pool);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes.
|
||||||
|
* <p>
|
||||||
|
* The returned instance will use default configs. Use
|
||||||
|
* {@link #getBufferedMutatorBuilder(TableName)} if you want to customize some configs.
|
||||||
|
* @param tableName the name of the table
|
||||||
|
* @return an {@link AsyncBufferedMutator} for the supplied tableName.
|
||||||
|
*/
|
||||||
|
default AsyncBufferedMutator getBufferedMutator(TableName tableName) {
|
||||||
|
return getBufferedMutatorBuilder(tableName).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}.
|
||||||
|
* @param tableName the name of the table
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes.
|
||||||
|
* <p>
|
||||||
|
* The returned instance will use default configs. Use
|
||||||
|
* {@link #getBufferedMutatorBuilder(TableName, ExecutorService)} if you want to customize some
|
||||||
|
* configs.
|
||||||
|
* @param tableName the name of the table
|
||||||
|
* @param pool the thread pool to use for executing callback
|
||||||
|
* @return an {@link AsyncBufferedMutator} for the supplied tableName.
|
||||||
|
*/
|
||||||
|
default AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
|
||||||
|
return getBufferedMutatorBuilder(tableName, pool).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}.
|
||||||
|
* @param tableName the name of the table
|
||||||
|
* @param pool the thread pool to use for executing callback
|
||||||
|
*/
|
||||||
|
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
|
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
|
||||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -87,6 +89,8 @@ class AsyncConnectionConfiguration {
|
||||||
|
|
||||||
private final long scannerMaxResultSize;
|
private final long scannerMaxResultSize;
|
||||||
|
|
||||||
|
private final long writeBufferSize;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
AsyncConnectionConfiguration(Configuration conf) {
|
AsyncConnectionConfiguration(Configuration conf) {
|
||||||
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||||
|
@ -112,6 +116,7 @@ class AsyncConnectionConfiguration {
|
||||||
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||||
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||||
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||||
|
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
long getMetaOperationTimeoutNs() {
|
long getMetaOperationTimeoutNs() {
|
||||||
|
@ -161,4 +166,8 @@ class AsyncConnectionConfiguration {
|
||||||
long getScannerMaxResultSize() {
|
long getScannerMaxResultSize() {
|
||||||
return scannerMaxResultSize;
|
return scannerMaxResultSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getWriteBufferSize() {
|
||||||
|
return writeBufferSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,4 +297,15 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
|
||||||
|
return new AsyncBufferedMutatorBuilderImpl(connConf, getRawTableBuilder(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
|
||||||
|
ExecutorService pool) {
|
||||||
|
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncBufferMutator {
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static byte[] CQ = Bytes.toBytes("cq");
|
||||||
|
|
||||||
|
private static int COUNT = 100;
|
||||||
|
|
||||||
|
private static byte[] VALUE = new byte[1024];
|
||||||
|
|
||||||
|
private static AsyncConnection CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, CF);
|
||||||
|
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
ThreadLocalRandom.current().nextBytes(VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
CONN.close();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws InterruptedException {
|
||||||
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
|
try (AsyncBufferedMutator mutator =
|
||||||
|
CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) {
|
||||||
|
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
|
||||||
|
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
// exceeded the write buffer size, a flush will be called directly
|
||||||
|
fs.forEach(f -> f.join());
|
||||||
|
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
|
||||||
|
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
|
||||||
|
});
|
||||||
|
// the first future should have been sent out.
|
||||||
|
futures.get(0).join();
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// the last one should still be in write buffer
|
||||||
|
assertFalse(futures.get(futures.size() - 1).isDone());
|
||||||
|
}
|
||||||
|
// mutator.close will call mutator.flush automatically so all tasks should have been done.
|
||||||
|
futures.forEach(f -> f.join());
|
||||||
|
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
|
||||||
|
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
|
||||||
|
.forEach(r -> {
|
||||||
|
assertArrayEquals(VALUE, r.getValue(CF, CQ));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClosedMutate() throws InterruptedException {
|
||||||
|
AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
|
||||||
|
mutator.close();
|
||||||
|
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
|
||||||
|
try {
|
||||||
|
mutator.mutate(put).get();
|
||||||
|
fail("Close check failed");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
assertThat(e.getCause(), instanceOf(IOException.class));
|
||||||
|
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
|
||||||
|
}
|
||||||
|
for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
|
||||||
|
try {
|
||||||
|
f.get();
|
||||||
|
fail("Close check failed");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
assertThat(e.getCause(), instanceOf(IOException.class));
|
||||||
|
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue