HBASE-16836 Implement increment and append
This commit is contained in:
parent
1b005f30e4
commit
b4f6ebde24
|
@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client;
|
||||||
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
|
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
|
||||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
||||||
|
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
|
||||||
|
|
||||||
import io.netty.util.HashedWheelTimer;
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
|
||||||
|
@ -80,8 +82,11 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
|
|
||||||
final AsyncRpcRetryingCallerFactory callerFactory;
|
final AsyncRpcRetryingCallerFactory callerFactory;
|
||||||
|
|
||||||
|
private final NonceGenerator nonceGenerator;
|
||||||
|
|
||||||
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
|
public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
|
@ -103,6 +108,11 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
|
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
|
||||||
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
|
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
|
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
|
||||||
|
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
|
||||||
|
nonceGenerator = PerClientRandomNonceGenerator.get();
|
||||||
|
} else {
|
||||||
|
nonceGenerator = NO_NONCE_GENERATOR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -127,6 +137,11 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
return locator;
|
return locator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ditto
|
||||||
|
public NonceGenerator getNonceGenerator() {
|
||||||
|
return nonceGenerator;
|
||||||
|
}
|
||||||
|
|
||||||
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
|
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
|
||||||
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
|
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -24,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}.
|
* The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}.
|
||||||
|
@ -99,28 +103,105 @@ public interface AsyncTable {
|
||||||
* This will return true if the Get matches one or more keys, false if not.
|
* This will return true if the Get matches one or more keys, false if not.
|
||||||
* <p>
|
* <p>
|
||||||
* This is a server-side call so it prevents any data from being transfered to the client.
|
* This is a server-side call so it prevents any data from being transfered to the client.
|
||||||
|
* @return true if the specified Get matches one or more keys, false if not. The return value will
|
||||||
|
* be wrapped by a {@link CompletableFuture}.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Boolean> exists(Get get);
|
default CompletableFuture<Boolean> exists(Get get) {
|
||||||
|
if (!get.isCheckExistenceOnly()) {
|
||||||
|
get = ReflectionUtils.newInstance(get.getClass(), get);
|
||||||
|
get.setCheckExistenceOnly(true);
|
||||||
|
}
|
||||||
|
return get(get).thenApply(r -> r.getExists());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts certain cells from a given row.
|
* Extracts certain cells from a given row.
|
||||||
* <p>
|
|
||||||
* Return the data coming from the specified row, if it exists. If the row specified doesn't
|
|
||||||
* exist, the {@link Result} instance returned won't contain any
|
|
||||||
* {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}.
|
|
||||||
* @param get The object that specifies what data to fetch and from which row.
|
* @param get The object that specifies what data to fetch and from which row.
|
||||||
|
* @return The data coming from the specified row, if it exists. If the row specified doesn't
|
||||||
|
* exist, the {@link Result} instance returned won't contain any
|
||||||
|
* {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
|
||||||
|
* return value will be wrapped by a {@link CompletableFuture}.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Result> get(Get get);
|
CompletableFuture<Result> get(Get get);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts some data to the table.
|
* Puts some data to the table.
|
||||||
* @param put The data to put.
|
* @param put The data to put.
|
||||||
|
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> put(Put put);
|
CompletableFuture<Void> put(Put put);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes the specified cells/row.
|
* Deletes the specified cells/row.
|
||||||
* @param delete The object that specifies what to delete.
|
* @param delete The object that specifies what to delete.
|
||||||
|
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> delete(Delete delete);
|
CompletableFuture<Void> delete(Delete delete);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Appends values to one or more columns within a single row.
|
||||||
|
* <p>
|
||||||
|
* This operation does not appear atomic to readers. Appends are done under a single row lock, so
|
||||||
|
* write operations to a row are synchronized, but readers do not take row locks so get and scan
|
||||||
|
* operations can see this operation partially completed.
|
||||||
|
* @param append object that specifies the columns and amounts to be used for the increment
|
||||||
|
* operations
|
||||||
|
* @return values of columns after the append operation (maybe null). The return value will be
|
||||||
|
* wrapped by a {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Result> append(Append append);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments one or more columns within a single row.
|
||||||
|
* <p>
|
||||||
|
* This operation does not appear atomic to readers. Increments are done under a single row lock,
|
||||||
|
* so write operations to a row are synchronized, but readers do not take row locks so get and
|
||||||
|
* scan operations can see this operation partially completed.
|
||||||
|
* @param increment object that specifies the columns and amounts to be used for the increment
|
||||||
|
* operations
|
||||||
|
* @return values of columns after the increment. The return value will be wrapped by a
|
||||||
|
* {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Result> increment(Increment increment);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
|
||||||
|
* <p>
|
||||||
|
* The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
|
||||||
|
* @param row The row that contains the cell to increment.
|
||||||
|
* @param family The column family of the cell to increment.
|
||||||
|
* @param qualifier The column qualifier of the cell to increment.
|
||||||
|
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||||
|
* @return The new value, post increment. The return value will be wrapped by a
|
||||||
|
* {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||||
|
long amount) {
|
||||||
|
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically increments a column value. If the column value already exists and is not a
|
||||||
|
* big-endian long, this could throw an exception. If the column value does not yet exist it is
|
||||||
|
* initialized to <code>amount</code> and written to the specified column.
|
||||||
|
* <p>
|
||||||
|
* Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
|
||||||
|
* any increments that have not been flushed.
|
||||||
|
* @param row The row that contains the cell to increment.
|
||||||
|
* @param family The column family of the cell to increment.
|
||||||
|
* @param qualifier The column qualifier of the cell to increment.
|
||||||
|
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||||
|
* @param durability The persistence guarantee for this increment.
|
||||||
|
* @return The new value, post increment. The return value will be wrapped by a
|
||||||
|
* {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||||
|
long amount, Durability durability) {
|
||||||
|
Preconditions.checkNotNull(row, "row is null");
|
||||||
|
Preconditions.checkNotNull(family, "family is null");
|
||||||
|
Preconditions.checkNotNull(qualifier, "qualifier is null");
|
||||||
|
return increment(
|
||||||
|
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
|
||||||
|
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The implementation of AsyncTable.
|
* The implementation of AsyncTable.
|
||||||
|
@ -111,21 +112,51 @@ class AsyncTableImpl implements AsyncTable {
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
|
||||||
|
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||||
|
Converter<MutateRequest, byte[], REQ> reqConvert,
|
||||||
|
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||||
|
return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
|
||||||
|
respConverter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
|
||||||
|
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||||
|
Converter<MutateRequest, byte[], REQ> reqConvert) {
|
||||||
|
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
|
||||||
|
throws IOException {
|
||||||
|
if (!resp.hasResult()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
private interface NoncedConverter<D, I, S> {
|
||||||
|
D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
|
||||||
|
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||||
|
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
|
||||||
|
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||||
|
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||||
|
long nonce = conn.getNonceGenerator().newNonce();
|
||||||
|
return mutate(controller, loc, stub, req,
|
||||||
|
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
|
||||||
|
}
|
||||||
|
|
||||||
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
||||||
return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
|
return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
|
||||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public CompletableFuture<Boolean> exists(Get get) {
|
|
||||||
if (!get.isCheckExistenceOnly()) {
|
|
||||||
get = ReflectionUtils.newInstance(get.getClass(), get);
|
|
||||||
get.setCheckExistenceOnly(true);
|
|
||||||
}
|
|
||||||
return get(get).thenApply(r -> r.getExists());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Result> get(Get get) {
|
public CompletableFuture<Result> get(Get get) {
|
||||||
return this.<Result> newCaller(get, readRpcTimeoutNs)
|
return this.<Result> newCaller(get, readRpcTimeoutNs)
|
||||||
|
@ -138,25 +169,35 @@ class AsyncTableImpl implements AsyncTable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> put(Put put) {
|
public CompletableFuture<Void> put(Put put) {
|
||||||
return this.<Void> newCaller(put, writeRpcTimeoutNs)
|
return this
|
||||||
.action(
|
.<Void> newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl
|
||||||
(controller, loc, stub) -> AsyncTableImpl.<Put, MutateRequest, MutateResponse, Void> call(
|
.<Put> voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest))
|
||||||
controller, loc, stub, put, RequestConverter::buildMutateRequest,
|
|
||||||
(s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> {
|
|
||||||
return null;
|
|
||||||
}))
|
|
||||||
.call();
|
.call();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> delete(Delete delete) {
|
public CompletableFuture<Void> delete(Delete delete) {
|
||||||
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
|
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
|
||||||
.action((controller, loc, stub) -> AsyncTableImpl
|
.action((controller, loc, stub) -> AsyncTableImpl.<Delete> voidMutate(controller, loc, stub,
|
||||||
.<Delete, MutateRequest, MutateResponse, Void> call(controller, loc, stub, delete,
|
delete, RequestConverter::buildMutateRequest))
|
||||||
RequestConverter::buildMutateRequest, (s, c, req, done) -> s.mutate(c, req, done),
|
.call();
|
||||||
(c, resp) -> {
|
}
|
||||||
return null;
|
|
||||||
}))
|
@Override
|
||||||
|
public CompletableFuture<Result> append(Append append) {
|
||||||
|
checkHasFamilies(append);
|
||||||
|
return this.<Result> newCaller(append, writeRpcTimeoutNs)
|
||||||
|
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
|
||||||
|
append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
|
||||||
|
.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Result> increment(Increment increment) {
|
||||||
|
checkHasFamilies(increment);
|
||||||
|
return this.<Result> newCaller(increment, writeRpcTimeoutNs)
|
||||||
|
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
|
||||||
|
stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
|
||||||
.call();
|
.call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||||
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||||
|
@ -108,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
class ConnectionImplementation implements ClusterConnection, Closeable {
|
class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||||
private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class);
|
private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class);
|
||||||
private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
|
|
||||||
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
|
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
|
||||||
|
|
||||||
private final boolean hostnamesCanChange;
|
private final boolean hostnamesCanChange;
|
||||||
|
@ -199,14 +200,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
this.rpcTimeout = conf.getInt(
|
this.rpcTimeout = conf.getInt(
|
||||||
HConstants.HBASE_RPC_TIMEOUT_KEY,
|
HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
|
if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
|
||||||
synchronized (nonceGeneratorCreateLock) {
|
synchronized (nonceGeneratorCreateLock) {
|
||||||
if (nonceGenerator == null) {
|
if (nonceGenerator == null) {
|
||||||
nonceGenerator = new PerClientRandomNonceGenerator();
|
nonceGenerator = PerClientRandomNonceGenerator.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nonceGenerator = new NoNonceGenerator();
|
nonceGenerator = NO_NONCE_GENERATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.stats = ServerStatisticTracker.create(conf);
|
this.stats = ServerStatisticTracker.create(conf);
|
||||||
|
@ -948,18 +949,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Dummy nonce generator for disabled nonces. */
|
|
||||||
static class NoNonceGenerator implements NonceGenerator {
|
|
||||||
@Override
|
|
||||||
public long getNonceGroup() {
|
|
||||||
return HConstants.NO_NONCE;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public long newNonce() {
|
|
||||||
return HConstants.NO_NONCE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The record of errors for servers.
|
* The record of errors for servers.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -202,4 +203,23 @@ public final class ConnectionUtils {
|
||||||
}
|
}
|
||||||
return serviceName + "@" + hostname + ":" + port;
|
return serviceName + "@" + hostname + ":" + port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void checkHasFamilies(Mutation mutation) {
|
||||||
|
Preconditions.checkArgument(mutation.numFamilies() > 0,
|
||||||
|
"Invalid arguments to %s, zero columns specified", mutation.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Dummy nonce generator for disabled nonces. */
|
||||||
|
static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long newNonce() {
|
||||||
|
return HConstants.NO_NONCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNonceGroup() {
|
||||||
|
return HConstants.NO_NONCE;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
|
||||||
|
// Internally, we use shaded protobuf. This below are part of our public API.
|
||||||
|
import com.google.protobuf.Descriptors;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.Service;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -49,8 +59,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
import org.apache.hadoop.hbase.client.RegionCoprocessorRpcChannel;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
|
//SEE ABOVE NOTE!
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||||
|
@ -65,15 +75,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
|
|
||||||
// Internally, we use shaded protobuf. This below are part of our public API.
|
|
||||||
import com.google.protobuf.Descriptors;
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import com.google.protobuf.Service;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
// SEE ABOVE NOTE!
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link Table}. Used to communicate with a single HBase table.
|
* An implementation of {@link Table}. Used to communicate with a single HBase table.
|
||||||
* Lightweight. Get as needed and just close when done.
|
* Lightweight. Get as needed and just close when done.
|
||||||
|
@ -617,12 +618,6 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void checkHasFamilies(final Mutation mutation) throws IOException {
|
|
||||||
if (mutation.numFamilies() == 0) {
|
|
||||||
throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface NonceGenerator {
|
public interface NonceGenerator {
|
||||||
|
|
||||||
|
static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
|
||||||
|
|
||||||
/** @return the nonce group (client ID) of this client manager. */
|
/** @return the nonce group (client ID) of this client manager. */
|
||||||
public long getNonceGroup();
|
long getNonceGroup();
|
||||||
|
|
||||||
/** @return New nonce. */
|
/** @return New nonce. */
|
||||||
public long newNonce();
|
long newNonce();
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,17 +25,20 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NonceGenerator implementation that uses client ID hash + random int as nonce group,
|
* NonceGenerator implementation that uses client ID hash + random int as nonce group, and random
|
||||||
* and random numbers as nonces.
|
* numbers as nonces.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class PerClientRandomNonceGenerator implements NonceGenerator {
|
public final class PerClientRandomNonceGenerator implements NonceGenerator {
|
||||||
|
|
||||||
|
private static final PerClientRandomNonceGenerator INST = new PerClientRandomNonceGenerator();
|
||||||
|
|
||||||
private final Random rdm = new Random();
|
private final Random rdm = new Random();
|
||||||
private final long clientId;
|
private final long clientId;
|
||||||
|
|
||||||
public PerClientRandomNonceGenerator() {
|
private PerClientRandomNonceGenerator() {
|
||||||
byte[] clientIdBase = ClientIdGenerator.generateClientId();
|
byte[] clientIdBase = ClientIdGenerator.generateClientId();
|
||||||
this.clientId = (((long)Arrays.hashCode(clientIdBase)) << 32) + rdm.nextInt();
|
this.clientId = (((long) Arrays.hashCode(clientIdBase)) << 32) + rdm.nextInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNonceGroup() {
|
public long getNonceGroup() {
|
||||||
|
@ -49,4 +52,11 @@ public class PerClientRandomNonceGenerator implements NonceGenerator {
|
||||||
} while (result == HConstants.NO_NONCE);
|
} while (result == HConstants.NO_NONCE);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the singleton nonce generator.
|
||||||
|
*/
|
||||||
|
public static PerClientRandomNonceGenerator get() {
|
||||||
|
return INST;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class CoprocessorHConnection extends ConnectionImplementation {
|
public class CoprocessorHConnection extends ConnectionImplementation {
|
||||||
private static final NonceGenerator NO_NONCE_GEN = new NoNonceGenerator();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a {@link ClusterConnection} based on the environment in which we are running the
|
* Create a {@link ClusterConnection} based on the environment in which we are running the
|
||||||
|
@ -101,6 +100,6 @@ public class CoprocessorHConnection extends ConnectionImplementation {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NonceGenerator getNonceGenerator() {
|
public NonceGenerator getNonceGenerator() {
|
||||||
return NO_NONCE_GEN; // don't use nonces for coprocessor connection
|
return ConnectionUtils.NO_NONCE_GENERATOR; // don't use nonces for coprocessor connection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,17 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
@ -33,9 +38,12 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestAsyncTable {
|
public class TestAsyncTable {
|
||||||
|
@ -48,14 +56,17 @@ public class TestAsyncTable {
|
||||||
|
|
||||||
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||||
|
|
||||||
private static byte[] ROW = Bytes.toBytes("row");
|
|
||||||
|
|
||||||
private static byte[] VALUE = Bytes.toBytes("value");
|
private static byte[] VALUE = Bytes.toBytes("value");
|
||||||
|
|
||||||
private static AsyncConnection ASYNC_CONN;
|
private static AsyncConnection ASYNC_CONN;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName testName = new TestName();
|
||||||
|
|
||||||
|
private byte[] row;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
|
@ -63,22 +74,27 @@ public class TestAsyncTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
ASYNC_CONN.close();
|
ASYNC_CONN.close();
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, InterruptedException {
|
||||||
|
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void testSimple() throws Exception {
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
||||||
assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
|
assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
|
||||||
Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
|
Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
|
||||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||||
table.delete(new Delete(ROW)).get();
|
table.delete(new Delete(row)).get();
|
||||||
result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
|
result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
|
||||||
assertTrue(result.isEmpty());
|
assertTrue(result.isEmpty());
|
||||||
assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
|
assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] concat(byte[] base, int index) {
|
private byte[] concat(byte[] base, int index) {
|
||||||
|
@ -86,24 +102,24 @@ public class TestAsyncTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiple() throws Exception {
|
public void testSimpleMultiple() throws Exception {
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
int count = 100;
|
int count = 100;
|
||||||
CountDownLatch putLatch = new CountDownLatch(count);
|
CountDownLatch putLatch = new CountDownLatch(count);
|
||||||
IntStream.range(0, count).forEach(
|
IntStream.range(0, count).forEach(
|
||||||
i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
|
i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
|
||||||
.thenAccept(x -> putLatch.countDown()));
|
.thenAccept(x -> putLatch.countDown()));
|
||||||
putLatch.await();
|
putLatch.await();
|
||||||
BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
|
BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
|
||||||
IntStream.range(0, count)
|
IntStream.range(0, count)
|
||||||
.forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
|
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
|
||||||
.thenAccept(x -> existsResp.add(x)));
|
.thenAccept(x -> existsResp.add(x)));
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
assertTrue(existsResp.take());
|
assertTrue(existsResp.take());
|
||||||
}
|
}
|
||||||
BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
|
BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
|
||||||
IntStream.range(0, count)
|
IntStream.range(0, count)
|
||||||
.forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
|
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
|
||||||
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
|
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
Pair<Integer, Result> pair = getResp.take();
|
Pair<Integer, Result> pair = getResp.take();
|
||||||
|
@ -112,20 +128,60 @@ public class TestAsyncTable {
|
||||||
}
|
}
|
||||||
CountDownLatch deleteLatch = new CountDownLatch(count);
|
CountDownLatch deleteLatch = new CountDownLatch(count);
|
||||||
IntStream.range(0, count).forEach(
|
IntStream.range(0, count).forEach(
|
||||||
i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown()));
|
i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
|
||||||
deleteLatch.await();
|
deleteLatch.await();
|
||||||
IntStream.range(0, count)
|
IntStream.range(0, count)
|
||||||
.forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
|
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
|
||||||
.thenAccept(x -> existsResp.add(x)));
|
.thenAccept(x -> existsResp.add(x)));
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
assertFalse(existsResp.take());
|
assertFalse(existsResp.take());
|
||||||
}
|
}
|
||||||
IntStream.range(0, count)
|
IntStream.range(0, count)
|
||||||
.forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
|
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
|
||||||
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
|
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
Pair<Integer, Result> pair = getResp.take();
|
Pair<Integer, Result> pair = getResp.take();
|
||||||
assertTrue(pair.getSecond().isEmpty());
|
assertTrue(pair.getSecond().isEmpty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrement() throws InterruptedException, ExecutionException {
|
||||||
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
|
int count = 100;
|
||||||
|
CountDownLatch latch = new CountDownLatch(count);
|
||||||
|
AtomicLong sum = new AtomicLong(0L);
|
||||||
|
IntStream.range(0, count)
|
||||||
|
.forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
|
||||||
|
sum.addAndGet(x);
|
||||||
|
latch.countDown();
|
||||||
|
}));
|
||||||
|
latch.await();
|
||||||
|
assertEquals(count, Bytes.toLong(
|
||||||
|
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
|
||||||
|
assertEquals((1 + count) * count / 2, sum.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppend() throws InterruptedException, ExecutionException {
|
||||||
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
|
int count = 10;
|
||||||
|
CountDownLatch latch = new CountDownLatch(count);
|
||||||
|
char suffix = ':';
|
||||||
|
AtomicLong suffixCount = new AtomicLong(0L);
|
||||||
|
IntStream.range(0, count).forEachOrdered(
|
||||||
|
i -> table.append(new Append(row).add(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
|
||||||
|
.thenAccept(r -> {
|
||||||
|
suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars()
|
||||||
|
.filter(x -> x == suffix).count());
|
||||||
|
latch.countDown();
|
||||||
|
}));
|
||||||
|
latch.await();
|
||||||
|
assertEquals((1 + count) * count / 2, suffixCount.get());
|
||||||
|
String value = Bytes.toString(
|
||||||
|
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
|
||||||
|
int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
|
||||||
|
.sorted().toArray();
|
||||||
|
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncTableNoncedRetry {
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||||
|
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||||
|
|
||||||
|
private static byte[] VALUE = Bytes.toBytes("value");
|
||||||
|
|
||||||
|
private static AsyncConnection ASYNC_CONN;
|
||||||
|
|
||||||
|
private static long NONCE = 1L;
|
||||||
|
|
||||||
|
private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long newNonce() {
|
||||||
|
return NONCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNonceGroup() {
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName testName = new TestName();
|
||||||
|
|
||||||
|
private byte[] row;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||||
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
|
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NonceGenerator getNonceGenerator() {
|
||||||
|
return NONCE_GENERATOR;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
ASYNC_CONN.close();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, InterruptedException {
|
||||||
|
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
|
||||||
|
NONCE++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppend() throws InterruptedException, ExecutionException {
|
||||||
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
|
Result result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
|
||||||
|
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||||
|
result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
|
||||||
|
// the second call should have no effect as we always generate the same nonce.
|
||||||
|
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||||
|
result = table.get(new Get(row)).get();
|
||||||
|
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrement() throws InterruptedException, ExecutionException {
|
||||||
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
|
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
|
||||||
|
// the second call should have no effect as we always generate the same nonce.
|
||||||
|
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
|
||||||
|
Result result = table.get(new Get(row)).get();
|
||||||
|
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
|
||||||
|
}
|
||||||
|
}
|
|
@ -500,19 +500,28 @@ public class TestMultiParallel {
|
||||||
put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
|
put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
|
||||||
|
|
||||||
// Replace nonce manager with the one that returns each nonce twice.
|
// Replace nonce manager with the one that returns each nonce twice.
|
||||||
NonceGenerator cnm = new PerClientRandomNonceGenerator() {
|
NonceGenerator cnm = new NonceGenerator() {
|
||||||
long lastNonce = -1;
|
|
||||||
|
private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
|
||||||
|
|
||||||
|
private long lastNonce = -1;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized long newNonce() {
|
public synchronized long newNonce() {
|
||||||
long nonce = 0;
|
long nonce = 0;
|
||||||
if (lastNonce == -1) {
|
if (lastNonce == -1) {
|
||||||
lastNonce = nonce = super.newNonce();
|
lastNonce = nonce = delegate.newNonce();
|
||||||
} else {
|
} else {
|
||||||
nonce = lastNonce;
|
nonce = lastNonce;
|
||||||
lastNonce = -1L;
|
lastNonce = -1L;
|
||||||
}
|
}
|
||||||
return nonce;
|
return nonce;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNonceGroup() {
|
||||||
|
return delegate.getNonceGroup();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NonceGenerator oldCnm =
|
NonceGenerator oldCnm =
|
||||||
|
|
|
@ -321,7 +321,9 @@ public class TestDistributedLogSplitting {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
|
private static class NonceGeneratorWithDups implements NonceGenerator {
|
||||||
|
|
||||||
|
private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
|
||||||
private boolean isDups = false;
|
private boolean isDups = false;
|
||||||
private LinkedList<Long> nonces = new LinkedList<Long>();
|
private LinkedList<Long> nonces = new LinkedList<Long>();
|
||||||
|
|
||||||
|
@ -331,12 +333,17 @@ public class TestDistributedLogSplitting {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long newNonce() {
|
public long newNonce() {
|
||||||
long nonce = isDups ? nonces.removeFirst() : super.newNonce();
|
long nonce = isDups ? nonces.removeFirst() : delegate.newNonce();
|
||||||
if (!isDups) {
|
if (!isDups) {
|
||||||
nonces.add(nonce);
|
nonces.add(nonce);
|
||||||
}
|
}
|
||||||
return nonce;
|
return nonce;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNonceGroup() {
|
||||||
|
return delegate.getNonceGroup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
|
@Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
|
||||||
|
|
Loading…
Reference in New Issue