HBASE-13093 Local mode HBase instance doesn't shut down.

This commit is contained in:
Andrey Stepachev 2015-03-17 14:47:11 +00:00
parent e1f972170d
commit d0af30ea3c
2 changed files with 13 additions and 6 deletions

View File

@ -262,7 +262,7 @@ public class AsyncRpcChannel {
handleSaslConnectionFailure(retryCount, cause, realTicket);
// Try to reconnect
AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() {
client.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
connect(bootstrap);
@ -289,7 +289,7 @@ public class AsyncRpcChannel {
*/
private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
if (connectCounter < client.maxRetries) {
AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() {
client.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
connect(bootstrap);
}
@ -339,7 +339,7 @@ public class AsyncRpcChannel {
// Add timeout for cleanup if none is present
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
cleanupTimer =
AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
client.newTimeout(timeoutTask, call.getRpcTimeout(),
TimeUnit.MILLISECONDS);
}
if (!connected) {
@ -601,7 +601,7 @@ public class AsyncRpcChannel {
}
if (nextCleanupTaskDelay > 0) {
cleanupTimer =
AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay,
client.newTimeout(timeoutTask, nextCleanupTaskDelay,
TimeUnit.MILLISECONDS);
} else {
cleanupTimer = null;

View File

@ -28,6 +28,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
@ -70,8 +72,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
public static final HashedWheelTimer WHEEL_TIMER =
new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
private static final HashedWheelTimer WHEEL_TIMER =
new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
100, TimeUnit.MILLISECONDS);
private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
new ChannelInitializer<SocketChannel>() {
@ -458,4 +461,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
}
}
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return WHEEL_TIMER.newTimeout(task, delay, unit);
}
}