HBASE-15921 Add first AsyncTable impl and create TableImpl based on it

This commit is contained in:
zhangduo 2016-10-14 18:38:02 +08:00
parent 4127fd2a7c
commit 3fe7508295
20 changed files with 1712 additions and 44 deletions

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* The asynchronous version of Connection.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncConnection extends Closeable {
/**
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
* <p>
* The reference returned is not a copy, so any change made to it will affect this instance.
*/
Configuration getConfiguration();
/**
* Retrieve a AsyncRegionLocator implementation to inspect region information on a table. The
* returned AsyncRegionLocator is not thread-safe, so a new instance should be created for each
* using thread. This is a lightweight operation. Pooling or caching of the returned
* AsyncRegionLocator is neither required nor desired.
* @param tableName Name of the table who's region is to be examined
* @return An AsyncRegionLocator instance
*/
AsyncTableRegionLocator getRegionLocator(TableName tableName);
/**
* Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread
* safe, a new instance should be created for each using thread. This is a lightweight operation,
* pooling or caching of the returned AsyncTable is neither required nor desired.
* <p>
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
* @param tableName the name of the table
* @return an AsyncTable to use for interactions with this table
*/
AsyncTable getTable(TableName tableName);
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Timeout configs.
*/
@InterfaceAudience.Private
class AsyncConnectionConfiguration {
private final long metaOperationTimeoutNs;
private final long operationTimeoutNs;
// timeout for each read rpc request
private final long readRpcTimeoutNs;
// timeout for each write rpc request
private final long writeRpcTimeoutNs;
private final long pauseNs;
private final int maxRetries;
/** How many retries are allowed before we start to log */
private final int startLogErrorsCnt;
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.pauseNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
}
long getMetaOperationTimeoutNs() {
return metaOperationTimeoutNs;
}
long getOperationTimeoutNs() {
return operationTimeoutNs;
}
long getReadRpcTimeoutNs() {
return readRpcTimeoutNs;
}
long getWriteRpcTimeoutNs() {
return writeRpcTimeoutNs;
}
long getPauseNs() {
return pauseNs;
}
int getMaxRetries() {
return maxRetries;
}
int getStartLogErrorsCnt() {
return startLogErrorsCnt;
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/**
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class);
private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
private final Configuration conf;
final AsyncConnectionConfiguration connConf;
private final User user;
private final ClusterRegistry registry;
private final String clusterId;
private final int rpcTimeout;
private final RpcClient rpcClient;
final RpcControllerFactory rpcControllerFactory;
private final boolean hostnameCanChange;
private final AsyncRegionLocator locator;
final AsyncRpcRetryingCallerFactory callerFactory;
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
this.conf = conf;
this.user = user;
this.connConf = new AsyncConnectionConfiguration(conf);
this.locator = new AsyncRegionLocator(conf);
// action below will not throw exception so no need to catch and close.
this.registry = ClusterRegistryFactory.getRegistry(conf);
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
if (LOG.isDebugEnabled()) {
LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT);
}
return CLUSTER_ID_DEFAULT;
});
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public void close() {
IOUtils.closeQuietly(locator);
IOUtils.closeQuietly(rpcClient);
IOUtils.closeQuietly(registry);
}
@Override
public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
return new AsyncTableRegionLocatorImpl(tableName, locator);
}
// we will override this method for testing retry caller, so do not remove this method.
AsyncRegionLocator getLocator() {
return locator;
}
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
return CollectionUtils.computeIfAbsentEx(rsStubs,
getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
() -> createRegionServerStub(serverName));
}
@Override
public AsyncTable getTable(TableName tableName) {
return new AsyncTableImpl(this, tableName);
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* TODO: reimplement using aync connection when the scan logic is ready. The current implementation
* is based on the blocking client.
*/
@InterfaceAudience.Private
class AsyncRegionLocator implements Closeable {
private final ConnectionImplementation conn;
AsyncRegionLocator(Configuration conf) throws IOException {
conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
boolean reload) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
try {
future.complete(conn.getRegionLocation(tableName, row, reload));
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
ServerName source) {
conn.updateCachedLocations(tableName, regionName, row, exception, source);
}
@Override
public void close() {
IOUtils.closeQuietly(conn);
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Factory to create an AsyncRpcRetryCaller.
*/
@InterfaceAudience.Private
class AsyncRpcRetryingCallerFactory {
private final AsyncConnectionImpl conn;
private final HashedWheelTimer retryTimer;
public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn;
this.retryTimer = retryTimer;
}
public class SingleRequestCallerBuilder<T> {
private TableName tableName;
private byte[] row;
private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
private long operationTimeoutNs = -1L;
private long rpcTimeoutNs = -1L;
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
}
public SingleRequestCallerBuilder<T> row(byte[] row) {
this.row = row;
return this;
}
public SingleRequestCallerBuilder<T> action(
AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
public SingleRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(operationTimeout);
return this;
}
public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
Preconditions.checkNotNull(tableName, "tableName is null"),
Preconditions.checkNotNull(row, "row is null"),
Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
conn.connConf.getStartLogErrorsCnt());
}
/**
* Shortcut for {@code build().call()}
*/
public CompletableFuture<T> call() {
return build().call();
}
}
/**
* Create retry caller for single action, such as get, put, delete, etc.
*/
public <T> SingleRequestCallerBuilder<T> single() {
return new SingleRequestCallerBuilder<>();
}
}

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
/**
* Retry caller for a single request, such as get, put, delete, etc.
*/
@InterfaceAudience.Private
class AsyncSingleRequestRpcRetryingCaller<T> {
private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
@FunctionalInterface
public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub);
}
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
private final TableName tableName;
private final byte[] row;
private final Callable<T> callable;
private final long pauseNs;
private final int maxAttempts;
private final long operationTimeoutNs;
private final long rpcTimeoutNs;
private final int startLogErrorsCnt;
private final CompletableFuture<T> future;
private final HBaseRpcController controller;
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int maxRetries,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.row = row;
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries);
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
}
private int tries = 1;
private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
private static Throwable translateException(Throwable t) {
if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
t = t.getCause();
}
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
if (t instanceof ServiceException && t.getCause() != null) {
t = translateException(t.getCause());
}
return t;
}
private void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
}
private void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get(), error);
}
RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
error, EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
completeExceptionally();
return;
}
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs);
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
}
updateCachedLocation.accept(error);
tries++;
retryTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// always restart from beginning.
locateThenCall();
}
}, delayNs, TimeUnit.NANOSECONDS);
}
private void resetController() {
controller.reset();
if (rpcTimeoutNs >= 0) {
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs)));
}
}
private void call(HRegionLocation loc) {
ClientService.Interface stub;
try {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
+ "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
err -> conn.getLocator().updateCachedLocations(tableName,
loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
return;
}
resetController();
callable.call(controller, loc, stub).whenComplete((result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
+ loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = "
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
err -> conn.getLocator().updateCachedLocations(tableName,
loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
return;
}
future.complete(result);
});
}
private void locateThenCall() {
conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
err -> {
});
return;
}
call(loc);
});
}
public CompletableFuture<T> call() {
locateThenCall();
return future;
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}.
* <p>
* The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
* concurrently.
* <p>
* Usually the implementations will not throw any exception directly, you need to get the exception
* from the returned {@link CompletableFuture}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncTable {
/**
* Gets the fully qualified table name instance of this table.
*/
TableName getName();
/**
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
* <p>
* The reference returned is not a copy, so any change made to it will affect this instance.
*/
Configuration getConfiguration();
/**
* Set timeout of each rpc read request in operations of this Table instance, will override the
* value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
* timeout reached.
*/
void setReadRpcTimeout(long timeout, TimeUnit unit);
/**
* Get timeout of each rpc read request in this Table instance.
*/
long getReadRpcTimeout(TimeUnit unit);
/**
* Set timeout of each rpc write request in operations of this Table instance, will override the
* value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
* timeout reached.
*/
void setWriteRpcTimeout(long timeout, TimeUnit unit);
/**
* Get timeout of each rpc write request in this Table instance.
*/
long getWriteRpcTimeout(TimeUnit unit);
/**
* Set timeout of each operation in this Table instance, will override the value of
* {@code hbase.client.operation.timeout} in configuration.
* <p>
* Operation timeout is a top-level restriction that makes sure an operation will not be blocked
* more than this. In each operation, if rpc request fails because of timeout or other reason, it
* will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
* reach the operation timeout before retries exhausted, it will break early and throw
* SocketTimeoutException.
*/
void setOperationTimeout(long timeout, TimeUnit unit);
/**
* Get timeout of each operation in Table instance.
*/
long getOperationTimeout(TimeUnit unit);
/**
* Test for the existence of columns in the table, as specified by the Get.
* <p>
* This will return true if the Get matches one or more keys, false if not.
* <p>
* This is a server-side call so it prevents any data from being transfered to the client.
*/
CompletableFuture<Boolean> exists(Get get);
/**
* Extracts certain cells from a given row.
* <p>
* Return the data coming from the specified row, if it exists. If the row specified doesn't
* exist, the {@link Result} instance returned won't contain any
* {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}.
* @param get The object that specifies what data to fetch and from which row.
*/
CompletableFuture<Result> get(Get get);
/**
* Puts some data to the table.
* @param put The data to put.
*/
CompletableFuture<Void> put(Put put);
/**
* Deletes the specified cells/row.
* @param delete The object that specifies what to delete.
*/
CompletableFuture<Void> delete(Delete delete);
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The implementation of AsyncTable.
*/
@InterfaceAudience.Private
class AsyncTableImpl implements AsyncTable {
private final AsyncConnectionImpl conn;
private final TableName tableName;
private long readRpcTimeoutNs;
private long writeRpcTimeoutNs;
private long operationTimeoutNs;
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
: conn.connConf.getOperationTimeoutNs();
}
@Override
public TableName getName() {
return tableName;
}
@Override
public Configuration getConfiguration() {
return conn.getConfiguration();
}
@FunctionalInterface
private interface Converter<D, I, S> {
D convert(I info, S src) throws IOException;
}
@FunctionalInterface
private interface RpcCall<RESP, REQ> {
void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
RpcCallback<RESP> done);
}
private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, HBaseRpcController, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
try {
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
new RpcCallback<PRESP>() {
@Override
public void run(PRESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(respConverter.convert(controller, resp));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
public CompletableFuture<Boolean> exists(Get get) {
if (!get.isCheckExistenceOnly()) {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(true);
}
return get(get).thenApply(r -> r.getExists());
}
@Override
public CompletableFuture<Result> get(Get get) {
return this.<Result> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> AsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
.call();
}
@Override
public CompletableFuture<Void> put(Put put) {
return this.<Void> newCaller(put, writeRpcTimeoutNs)
.action(
(controller, loc, stub) -> AsyncTableImpl.<Put, MutateRequest, MutateResponse, Void> call(
controller, loc, stub, put, RequestConverter::buildMutateRequest,
(s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> {
return null;
}))
.call();
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> AsyncTableImpl
.<Delete, MutateRequest, MutateResponse, Void> call(controller, loc, stub, delete,
RequestConverter::buildMutateRequest, (s, c, req, done) -> s.mutate(c, req, done),
(c, resp) -> {
return null;
}))
.call();
}
@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
}
@Override
public long getReadRpcTimeout(TimeUnit unit) {
return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
this.writeRpcTimeoutNs = unit.toNanos(timeout);
}
@Override
public long getWriteRpcTimeout(TimeUnit unit) {
return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
public void setOperationTimeout(long timeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(timeout);
}
@Override
public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* The asynchronous version of RegionLocator.
* <p>
* Usually the implementations will not throw any exception directly, you need to get the exception
* from the returned {@link CompletableFuture}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncTableRegionLocator {
/**
* Gets the fully qualified table name instance of the table whose region we want to locate.
*/
TableName getName();
/**
* Finds the region on which the given row is being served. Does not reload the cache.
* <p>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
*/
default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row) {
return getRegionLocation(row, false);
}
/**
* Finds the region on which the given row is being served.
* <p>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
* @param reload true to reload information or false to use cached information
*/
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* The implementation of AsyncRegionLocator.
*/
@InterfaceAudience.Private
class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
private final TableName tableName;
private final AsyncRegionLocator locator;
public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator locator) {
this.tableName = tableName;
this.locator = locator;
}
@Override
public TableName getName() {
return tableName;
}
@Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return locator.getRegionLocation(tableName, row, reload);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Implementations hold cluster information such as this cluster's id.
* <p>
* Internal use only.
*/
@InterfaceAudience.Private
interface ClusterRegistry extends Closeable {
/**
* Should only be called once.
* <p>
* The upper layer should store this value somewhere as it will not be change any more.
*/
String getClusterId();
@Override
void close();
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* Get instance of configured Registry.
*/
@InterfaceAudience.Private
final class ClusterRegistryFactory {
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
private ClusterRegistryFactory() {
}
/**
* @return The cluster registry implementation to use.
*/
static ClusterRegistry getRegistry(Configuration conf) {
Class<? extends ClusterRegistry> clazz =
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
}

View File

@ -18,14 +18,16 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -38,8 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -63,6 +63,11 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@ -79,8 +84,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -92,11 +95,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
/** /**
* Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
@ -196,7 +195,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS); HConstants.DEFAULT_USE_META_REPLICAS);
// how many times to try, one more than max *retry* time // how many times to try, one more than max *retry* time
this.numTries = connectionConfig.getRetriesNumber() + 1; this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
this.rpcTimeout = conf.getInt( this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT); HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@ -1094,8 +1093,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new MasterNotRunningException(sn + " is dead."); throw new MasterNotRunningException(sn + " is dead.");
} }
// Use the security info interface name as our stub key // Use the security info interface name as our stub key
String key = getStubKey(getServiceName(), String key = getStubKey(getServiceName(), sn, hostnamesCanChange);
sn.getHostname(), sn.getPort(), hostnamesCanChange);
connectionLock.putIfAbsent(key, key); connectionLock.putIfAbsent(key, key);
Object stub = null; Object stub = null;
synchronized (connectionLock.get(key)) { synchronized (connectionLock.get(key)) {
@ -1176,8 +1174,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (isDeadServer(serverName)) { if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead."); throw new RegionServerStoppedException(serverName + " is dead.");
} }
String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); this.hostnamesCanChange);
this.connectionLock.putIfAbsent(key, key); this.connectionLock.putIfAbsent(key, key);
AdminProtos.AdminService.BlockingInterface stub; AdminProtos.AdminService.BlockingInterface stub;
synchronized (this.connectionLock.get(key)) { synchronized (this.connectionLock.get(key)) {
@ -1198,8 +1196,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (isDeadServer(sn)) { if (isDeadServer(sn)) {
throw new RegionServerStoppedException(sn + " is dead."); throw new RegionServerStoppedException(sn + " is dead.");
} }
String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(), String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn,
sn.getPort(), this.hostnamesCanChange); this.hostnamesCanChange);
this.connectionLock.putIfAbsent(key, key); this.connectionLock.putIfAbsent(key, key);
ClientProtos.ClientService.BlockingInterface stub = null; ClientProtos.ClientService.BlockingInterface stub = null;
synchronized (this.connectionLock.get(key)) { synchronized (this.connectionLock.get(key)) {
@ -1215,25 +1213,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return stub; return stub;
} }
static String getStubKey(final String serviceName,
final String rsHostname,
int port,
boolean resolveHostnames) {
// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
String address = rsHostname;
if (resolveHostnames) {
InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
if (i != null) {
address = i.getHostAddress() + "-" + rsHostname;
}
}
return serviceName + "@" + address + ":" + port;
}
private ZooKeeperKeepAliveConnection keepAliveZookeeper; private ZooKeeperKeepAliveConnection keepAliveZookeeper;
private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0); private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -40,6 +43,8 @@ import org.apache.hadoop.hbase.security.UserProvider;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ConnectionUtils { public final class ConnectionUtils {
private static final Log LOG = LogFactory.getLog(ConnectionUtils.class);
private ConnectionUtils() {} private ConnectionUtils() {}
/** /**
@ -167,4 +172,34 @@ public final class ConnectionUtils {
return false; return false;
} }
} }
/**
* Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
*/
static int retries2Attempts(int retries) {
return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1);
}
/**
* Get a unique key for the rpc stub to the given server.
*/
static String getStubKey(String serviceName, ServerName serverName,
boolean hostnameCanChange) {
// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
String hostname = serverName.getHostname();
int port = serverName.getPort();
if (hostnameCanChange) {
try {
InetAddress ip = InetAddress.getByName(hostname);
return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port;
} catch (UnknownHostException e) {
LOG.warn("Can not resolve " + hostname + ", please check your network", e);
}
}
return serviceName + "@" + hostname + ":" + port;
}
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
@ -32,12 +34,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
/** /**
* Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
* threadlocal outstanding timeouts as so we don't persist too much. * threadlocal outstanding timeouts as so we don't persist too much.
@ -70,7 +71,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
public RpcRetryingCallerImpl(long pause, int retries, public RpcRetryingCallerImpl(long pause, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
this.pause = pause; this.pause = pause;
this.maxAttempts = retries + 1; this.maxAttempts = retries2Attempts(retries);
this.interceptor = interceptor; this.interceptor = interceptor;
context = interceptor.createEmptyContext(); context = interceptor.createEmptyContext();
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
/**
* Cache the cluster registry data in memory and use zk watcher to update. The only exception is
* {@link #getClusterId()}, it will fetch the data from zk directly.
*/
@InterfaceAudience.Private
class ZKClusterRegistry implements ClusterRegistry {
private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class);
private final RecoverableZooKeeper zk;
private final ZNodePaths znodePaths;
ZKClusterRegistry(Configuration conf) throws IOException {
this.znodePaths = new ZNodePaths(conf);
int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
this.zk = new RecoverableZooKeeper(ZKConfig.getZKQuorumServersString(conf), zkSessionTimeout,
null, zkRetry, zkRetryIntervalMs);
}
@Override
public String getClusterId() {
try {
byte[] data = zk.getData(znodePaths.clusterIdZNode, false, null);
if (data == null || data.length == 0) {
return null;
}
return ClusterId.parseFrom(data).toString();
} catch (Exception e) {
LOG.warn("failed to get cluster id", e);
return null;
}
}
@Override
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
LOG.warn("close zookeeper failed", e);
}
}
}

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -104,4 +106,26 @@ public class CollectionUtils {
} }
return list.get(list.size() - 1); return list.get(list.size() - 1);
} }
/**
* A supplier that throws IOException when get.
*/
@FunctionalInterface
public interface IOExceptionSupplier<V> {
V get() throws IOException;
}
/**
* In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
* value already exists. So here we copy the implementation of
* {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and
* putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee
* that the supplier will only be executed once.
*/
public static <K, V> V computeIfAbsentEx(ConcurrentMap<K, V> map, K key,
IOExceptionSupplier<V> supplier) throws IOException {
V v, newValue;
return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
&& (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
}
} }

View File

@ -52,6 +52,7 @@ public class ReflectionUtils {
private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) { private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
try { try {
ctor.setAccessible(true);
return ctor.newInstance(ctorArgs); return ctor.newInstance(ctorArgs);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
@ -65,14 +66,13 @@ public class ReflectionUtils {
} }
} }
@SuppressWarnings("unchecked")
public static <T> T newInstance(Class<T> type, Object... params) { public static <T> T newInstance(Class<T> type, Object... params) {
return instantiate(type.getName(), findConstructor(type, params), params); return instantiate(type.getName(), findConstructor(type, params), params);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) { public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) {
Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors(); Constructor<T>[] constructors = (Constructor<T>[]) type.getDeclaredConstructors();
for (Constructor<T> ctor : constructors) { for (Constructor<T> ctor : constructors) {
Class<?>[] ctorParamTypes = ctor.getParameterTypes(); Class<?>[] ctorParamTypes = ctor.getParameterTypes();
if (ctorParamTypes.length != paramTypes.length) { if (ctorParamTypes.length != paramTypes.length) {

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncSingleRequestRpcRetryingCaller {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] VALUE = Bytes.toBytes("value");
private AsyncConnectionImpl asyncConn;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(2);
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() {
if (asyncConn != null) {
asyncConn.close();
asyncConn = null;
}
}
private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt);
conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
}
@Test
public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
initConn(0, 100, 30);
// This will leave a cached entry in location cache
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
AsyncTable table = asyncConn.getTable(TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(loc.getServerName().getServerName()));
Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
private <T> CompletableFuture<T> failedFuture() {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("Inject error!"));
return future;
}
@Test
public void testMaxRetries() throws IOException, InterruptedException {
initConn(0, 10, 2);
try {
asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
.action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
}
}
@Test
public void testOperationTimeout() throws IOException, InterruptedException {
initConn(0, 100, Integer.MAX_VALUE);
long startNs = System.nanoTime();
try {
asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW)
.operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture())
.call().get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
}
long costNs = System.nanoTime() - startNs;
assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
}
@Test
public void testLocateError() throws IOException, InterruptedException, ExecutionException {
initConn(0, 100, 5);
AtomicBoolean errorTriggered = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) {
@Override
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
boolean reload) {
if (tableName.equals(TABLE_NAME)) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
if (count.getAndIncrement() == 0) {
errorTriggered.set(true);
future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
future.complete(loc);
}
return future;
} else {
return super.getRegionLocation(tableName, row, reload);
}
}
@Override
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row,
Object exception, ServerName source) {
}
};
AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(),
User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
return mockedLocator;
}
}) {
AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get());
errorTriggered.set(false);
count.set(0);
Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
assertTrue(errorTriggered.get());
}
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTable {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] VALUE = Bytes.toBytes("value");
private static AsyncConnection ASYNC_CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
}
@AfterClass
public static void tearDown() throws Exception {
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
table.delete(new Delete(ROW)).get();
result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
assertTrue(result.isEmpty());
assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
}
private byte[] concat(byte[] base, int index) {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
@Test
public void testMultiple() throws Exception {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int count = 100;
CountDownLatch putLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(
i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
.thenAccept(x -> putLatch.countDown()));
putLatch.await();
BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertTrue(existsResp.take());
}
BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertArrayEquals(concat(VALUE, pair.getFirst()),
pair.getSecond().getValue(FAMILY, QUALIFIER));
}
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(
i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown()));
deleteLatch.await();
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertFalse(existsResp.take());
}
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertTrue(pair.getSecond().isEmpty());
}
}
}