HBASE-27222 Purge FutureReturnValueIgnored warnings from error prone (#4634)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Duo Zhang 2022-07-26 23:42:37 +08:00 committed by GitHub
parent ebd189157e
commit 8b091c4061
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 212 additions and 82 deletions

View File

@ -22,23 +22,26 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import com.google.errorprone.annotations.RestrictedApi; import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -48,6 +51,8 @@ import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Can
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -63,14 +68,13 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
@ -252,7 +256,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// disable further write, and fail all pending ack. // disable further write, and fail all pending ack.
state = State.BROKEN; state = State.BROKEN;
failWaitingAckQueue(channel, errorSupplier); failWaitingAckQueue(channel, errorSupplier);
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
} }
private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) { private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
@ -329,7 +333,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
ByteBuf buf = alloc.buffer(len); ByteBuf buf = alloc.buffer(len);
heartbeat.putInBuffer(buf.nioBuffer(0, len)); heartbeat.putInBuffer(buf.nioBuffer(0, len));
buf.writerIndex(len); buf.writerIndex(len);
ctx.channel().writeAndFlush(buf); safeWriteAndFlush(ctx.channel(), buf);
} }
return; return;
} }
@ -440,9 +444,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// TODO: we should perhaps measure time taken per DN here; // TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput. // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeInfoMap.keySet().forEach(ch -> { datanodeInfoMap.keySet().forEach(ch -> {
ch.write(headerBuf.retainedDuplicate()); safeWrite(ch, headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate()); safeWrite(ch, checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate()); safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
}); });
checksumBuf.release(); checksumBuf.release();
headerBuf.release(); headerBuf.release();
@ -562,16 +566,18 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
headerBuf.writerIndex(headerLen); headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
headerBuf.release(); headerBuf.release();
try { FutureUtils.get(future);
future.get(); }
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e); private void closeDataNodeChannelsAndAwait() {
} catch (ExecutionException e) { List<ChannelFuture> futures = new ArrayList<>();
Throwable cause = e.getCause(); for (Channel ch : datanodeInfoMap.keySet()) {
Throwables.propagateIfPossible(cause, IOException.class); futures.add(ch.close());
throw new IOException(cause); }
for (ChannelFuture future : futures) {
consume(future.awaitUninterruptibly());
} }
} }
@ -579,14 +585,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
* The close method when error occurred. Now we just call recoverFileLease. * The close method when error occurred. Now we just call recoverFileLease.
*/ */
@Override @Override
@SuppressWarnings("FutureReturnValueIgnored")
public void recoverAndClose(CancelableProgressable reporter) throws IOException { public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) { if (buf != null) {
buf.release(); buf.release();
buf = null; buf = null;
} }
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); closeDataNodeChannelsAndAwait();
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId); endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter); reporter == null ? new CancelOnClose(client) : reporter);
@ -597,12 +601,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/ */
@Override @Override
@SuppressWarnings("FutureReturnValueIgnored")
public void close() throws IOException { public void close() throws IOException {
endBlock(); endBlock();
state = State.CLOSED; state = State.CLOSED;
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); closeDataNodeChannelsAndAwait();
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
block.setNumBytes(ackedBlockLength); block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId); completeFile(client, namenode, src, clientName, block, fileId);
} }

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@ -351,7 +354,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code); buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer); safeWriteAndFlush(channel, buffer);
} }
private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
@ -360,7 +363,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throws IOException { throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise(); Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() { addListener(saslPromise, new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
@ -404,7 +407,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Promise<Channel> promise = eventLoopGroup.next().newPromise(); Promise<Channel> promise = eventLoopGroup.next().newPromise();
futureList.add(promise); futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoopGroup).channel(channelClass) addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
@Override @Override
@ -413,7 +416,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// channel connected. Leave an empty implementation here because netty does not allow // channel connected. Leave an empty implementation here because netty does not allow
// a null handler. // a null handler.
} }
}).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
@ -533,12 +536,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
if (!succ) { if (!succ) {
if (futureList != null) { if (futureList != null) {
for (Future<Channel> f : futureList) { for (Future<Channel> f : futureList) {
f.addListener(new FutureListener<Channel>() { addListener(f, new FutureListener<Channel>() {
@Override @Override
public void operationComplete(Future<Channel> future) throws Exception { public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
future.getNow().close(); safeClose(future.getNow());
} }
} }
}); });

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
@ -448,12 +449,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
size += CodedOutputStream.computeRawVarint32Size(size); size += CodedOutputStream.computeRawVarint32Size(size);
ByteBuf buf = ctx.alloc().buffer(size); ByteBuf buf = ctx.alloc().buffer(size);
proto.writeDelimitedTo(new ByteBufOutputStream(buf)); proto.writeDelimitedTo(new ByteBufOutputStream(buf));
ctx.write(buf); safeWrite(ctx, buf);
} }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
sendSaslMessage(ctx, new byte[0]); sendSaslMessage(ctx, new byte[0]);
ctx.flush(); ctx.flush();
step++; step++;
@ -642,7 +643,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
cBuf.addComponent(buf); cBuf.addComponent(buf);
cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
} else { } else {
ctx.write(msg); safeWrite(ctx, msg);
} }
} }
@ -656,7 +657,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
buf.writeInt(wrapped.length); buf.writeInt(wrapped.length);
buf.writeBytes(wrapped); buf.writeBytes(wrapped);
ctx.write(buf); safeWrite(ctx, buf);
} }
ctx.flush(); ctx.flush();
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.util.FutureUtils.consume;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -93,9 +94,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException, InterruptedException { public static void tearDown() throws Exception {
if (EVENT_LOOP_GROUP != null) { if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync(); EVENT_LOOP_GROUP.shutdownGracefully().get();
} }
shutdownMiniDFSCluster(); shutdownMiniDFSCluster();
} }
@ -262,7 +263,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
byte[] b = new byte[50 * 1024 * 1024]; byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b); Bytes.random(b);
out.write(b); out.write(b);
out.flush(false); consume(out.flush(false));
assertEquals(b.length, out.flush(false).get().longValue()); assertEquals(b.length, out.flush(false).get().longValue());
out.close(); out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen()); assertEquals(b.length, FS.getFileStatus(f).getLen());

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -103,12 +102,12 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException, InterruptedException { public static void tearDown() throws Exception {
if (OUT != null) { if (OUT != null) {
OUT.recoverAndClose(null); OUT.recoverAndClose(null);
} }
if (EVENT_LOOP_GROUP != null) { if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync(); EVENT_LOOP_GROUP.shutdownGracefully().get();
} }
shutdownMiniDFSCluster(); shutdownMiniDFSCluster();
} }

View File

@ -53,9 +53,9 @@ public class TestLocalAsyncOutput {
private static StreamSlowMonitor MONITOR; private static StreamSlowMonitor MONITOR;
@AfterClass @AfterClass
public static void tearDownAfterClass() throws IOException { public static void tearDownAfterClass() throws Exception {
TEST_UTIL.cleanupTestDir(); TEST_UTIL.cleanupTestDir();
GROUP.shutdownGracefully(); GROUP.shutdownGracefully().get();
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor"); MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
} }

View File

@ -193,9 +193,9 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
} }
@AfterClass @AfterClass
public static void tearDownAfterClass() throws IOException, InterruptedException { public static void tearDownAfterClass() throws Exception {
if (EVENT_LOOP_GROUP != null) { if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync(); EVENT_LOOP_GROUP.shutdownGracefully().get();
} }
if (KDC != null) { if (KDC != null) {
KDC.stop(); KDC.stop();

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
@ -183,7 +186,6 @@ class ClusterStatusListener implements Closeable {
@Override @Override
public void connect(Configuration conf) throws IOException { public void connect(Configuration conf) throws IOException {
String mcAddress = String mcAddress =
conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS, String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
@ -218,16 +220,21 @@ class ClusterStatusListener implements Closeable {
} }
LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
channel.joinGroup(ina, ni, null, channel.newPromise()); try {
consume(channel.joinGroup(ina, ni, null).sync());
} catch (InterruptedException e) {
close();
throw ExceptionUtil.asInterrupt(e);
}
} }
@Override @Override
public void close() { public void close() {
if (channel != null) { if (channel != null) {
channel.close(); safeClose(channel);
channel = null; channel = null;
} }
group.shutdownGracefully(); consume(group.shutdownGracefully());
} }
/** /**

View File

@ -121,7 +121,7 @@ public class TestIPCUtil {
} }
@Test @Test
public void testExecute() throws IOException { public void testExecute() throws Exception {
EventLoop eventLoop = new DefaultEventLoop(); EventLoop eventLoop = new DefaultEventLoop();
MutableInt executed = new MutableInt(0); MutableInt executed = new MutableInt(0);
MutableInt numStackTraceElements = new MutableInt(0); MutableInt numStackTraceElements = new MutableInt(0);
@ -156,7 +156,7 @@ public class TestIPCUtil {
}); });
FutureUtils.get(future); FutureUtils.get(future);
} finally { } finally {
eventLoop.shutdownGracefully(); eventLoop.shutdownGracefully().get();
} }
} }
} }

View File

@ -93,6 +93,17 @@ public final class FutureUtils {
}, executor); }, executor);
} }
/**
* Log the error if the future indicates any failure.
*/
public static void consume(CompletableFuture<?> future) {
addListener(future, (r, e) -> {
if (e != null) {
LOG.warn("Async operation fails", e);
}
});
}
/** /**
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
* the callbacks in the given {@code executor}. * the callbacks in the given {@code executor}.

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener;
/**
* Helper class for processing netty futures.
*/
@InterfaceAudience.Private
public final class NettyFutureUtils {
private static final Logger LOG = LoggerFactory.getLogger(NettyFutureUtils.class);
private NettyFutureUtils() {
}
/**
* This is method is used when you just want to add a listener to the given netty future. Ignoring
* the return value of a Future is considered as a bad practice as it may suppress exceptions
* thrown from the code that completes the future, and this method will catch all the exception
* thrown from the {@code listener} to catch possible code bugs.
* <p/>
* And the error phone check will always report FutureReturnValueIgnored because every method in
* the {@link Future} class will return a new {@link Future}, so you always have one future that
* has not been checked. So we introduce this method and add a suppress warnings annotation here.
*/
@SuppressWarnings({ "FutureReturnValueIgnored", "rawtypes", "unchecked" })
public static <V> void addListener(Future<V> future,
GenericFutureListener<? extends Future<? super V>> listener) {
future.addListener(f -> {
try {
// the ? operator in template makes it really hard to pass compile, so here we just cast the
// listener to raw type.
((GenericFutureListener) listener).operationComplete(f);
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing netty", t);
}
});
}
private static void loggingWhenError(Future<?> future) {
if (!future.isSuccess()) {
LOG.warn("IO operation failed", future.cause());
}
}
/**
* Log the error if the future indicates any failure.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static void consume(Future<?> future) {
future.addListener(NettyFutureUtils::loggingWhenError);
}
/**
* Close the channel and eat the returned future by logging the error when the future is completed
* with error.
*/
public static void safeClose(ChannelOutboundInvoker channel) {
consume(channel.close());
}
/**
* Call write on the channel and eat the returned future by logging the error when the future is
* completed with error.
*/
public static void safeWrite(ChannelOutboundInvoker channel, Object msg) {
consume(channel.write(msg));
}
/**
* Call writeAndFlush on the channel and eat the returned future by logging the error when the
* future is completed with error.
*/
public static void safeWriteAndFlush(ChannelOutboundInvoker channel, Object msg) {
consume(channel.writeAndFlush(msg));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client.example; package org.apache.hadoop.hbase.client.example;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -71,6 +72,11 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
* *
* Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content
* when doing PUT. * when doing PUT.
* <p>
* Notice that, future class methods will all return a new Future, so you always have one future
* that will not been checked, so we need to suppress error-prone "FutureReturnValueIgnored"
* warnings on the methods such as join and stop. In your real production code, you should use your
* own convenient way to address the warning.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HttpProxyExample { public class HttpProxyExample {
@ -148,7 +154,7 @@ public class HttpProxyExample {
resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
} }
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8");
ctx.writeAndFlush(resp); safeWriteAndFlush(ctx, resp);
} }
private Params parse(FullHttpRequest req) { private Params parse(FullHttpRequest req) {
@ -239,6 +245,7 @@ public class HttpProxyExample {
}).bind(port).syncUninterruptibly().channel(); }).bind(port).syncUninterruptibly().channel();
} }
@SuppressWarnings("FutureReturnValueIgnored")
public void join() { public void join() {
serverChannel.closeFuture().awaitUninterruptibly(); serverChannel.closeFuture().awaitUninterruptibly();
} }
@ -251,6 +258,7 @@ public class HttpProxyExample {
} }
} }
@SuppressWarnings("FutureReturnValueIgnored")
public void stop() throws IOException { public void stop() throws IOException {
serverChannel.close().syncUninterruptibly(); serverChannel.close().syncUninterruptibly();
serverChannel = null; serverChannel = null;

View File

@ -80,23 +80,10 @@ public class MemcachedBlockCache implements BlockCache {
boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
ConnectionFactoryBuilder builder = ConnectionFactoryBuilder builder =
new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) // Cap // Cap the max time before anything times out
// the new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
// max // Don't keep threads around past the end of days.
// time .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
// before
// anything
// times
// out
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) // Don't
// keep
// threads
// around
// past
// the
// end
// of
// days.
.setUseNagleAlgorithm(false) // Ain't nobody got time for that .setUseNagleAlgorithm(false) // Ain't nobody got time for that
.setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
@ -124,10 +111,17 @@ public class MemcachedBlockCache implements BlockCache {
cacheBlock(cacheKey, buf); cacheBlock(cacheKey, buf);
} }
@SuppressWarnings("FutureReturnValueIgnored")
@Override @Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) { if (buf instanceof HFileBlock) {
client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> {
try {
f.get();
} catch (ExecutionException e) {
LOG.warn("Failed to cache block", e);
}
});
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(

View File

@ -108,7 +108,7 @@ public class ShellExecEndpointCoprocessor extends ShellExecEndpoint.ShellExecSer
private void runBackgroundTask(final Shell.ShellCommandExecutor shell, private void runBackgroundTask(final Shell.ShellCommandExecutor shell,
final RpcCallback<ShellExecResponse> done) { final RpcCallback<ShellExecResponse> done) {
final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS); final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS);
backgroundExecutor.submit(() -> { backgroundExecutor.execute(() -> {
try { try {
// sleep first so that the RPC can ACK. race condition here as we have no means of blocking // sleep first so that the RPC can ACK. race condition here as we have no means of blocking
// until the IPC response has been acknowledged by the client. // until the IPC response has been acknowledged by the client.

View File

@ -72,7 +72,7 @@ public class MergeRandomAdjacentRegionsOfTableAction extends Action {
} }
try { try {
admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false); admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false).get();
} catch (Exception ex) { } catch (Exception ex) {
getLogger().warn("Merge failed, might be caused by other chaos: " + ex.getMessage()); getLogger().warn("Merge failed, might be caused by other chaos: " + ex.getMessage());
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.test; package org.apache.hadoop.hbase.test;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInput; import java.io.DataInput;
@ -684,9 +686,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
} }
final long putStartTime = System.currentTimeMillis(); final long putStartTime = System.currentTimeMillis();
final CompletableFuture<Void> putFuture = table.put(put); final CompletableFuture<Void> putFuture = table.put(put);
putFuture.thenRun(() -> { addListener(putFuture, (r, e) -> {
inflight.decrementAndGet(); inflight.decrementAndGet();
if (!putFuture.isCompletedExceptionally()) { if (e == null) {
output.getCounter(Counts.RPC_TIME_MS) output.getCounter(Counts.RPC_TIME_MS)
.increment(System.currentTimeMillis() - putStartTime); .increment(System.currentTimeMillis() - putStartTime);
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize());
@ -732,9 +734,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
} }
final long incrStartTime = System.currentTimeMillis(); final long incrStartTime = System.currentTimeMillis();
final CompletableFuture<Result> incrFuture = table.increment(increment); final CompletableFuture<Result> incrFuture = table.increment(increment);
incrFuture.thenRun(() -> { addListener(incrFuture, (r, e) -> {
inflight.decrementAndGet(); inflight.decrementAndGet();
if (!incrFuture.isCompletedExceptionally()) { if (e == null) {
output.getCounter(Counts.RPC_TIME_MS) output.getCounter(Counts.RPC_TIME_MS)
.increment(System.currentTimeMillis() - incrStartTime); .increment(System.currentTimeMillis() - incrStartTime);
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize());

View File

@ -150,7 +150,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
} }
} }
}; };
service.submit(runnable); service.execute(runnable);
} }
} }
@ -194,7 +194,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
} }
}; };
service.submit(runnable); service.execute(runnable);
} }
} }

View File

@ -722,6 +722,7 @@ public class TestAdmin2 extends TestAdminBase {
assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize()); assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize());
} }
@SuppressWarnings("FutureReturnValueIgnored")
@Test @Test
public void testTableMergeFollowedByModify() throws Exception { public void testTableMergeFollowedByModify() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());

View File

@ -236,6 +236,7 @@ public class TestWakeUpUnexpectedProcedure {
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
@SuppressWarnings("FutureReturnValueIgnored")
@Test @Test
public void test() throws Exception { public void test() throws Exception {
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();

View File

@ -1284,7 +1284,7 @@ public class TestHStore {
storeFlushCtx.prepare(); storeFlushCtx.prepare();
}; };
ExecutorService service = Executors.newSingleThreadExecutor(); ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(flush); service.execute(flush);
// we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
// this is blocked until we recreate the active memstore -- phase (3/5) // this is blocked until we recreate the active memstore -- phase (3/5)
// we get scanner from active memstore but it is empty -- phase (5/5) // we get scanner from active memstore but it is empty -- phase (5/5)
@ -1321,7 +1321,7 @@ public class TestHStore {
public void getScanners(MyStore store) throws IOException { public void getScanners(MyStore store) throws IOException {
final long tmpId = id++; final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor(); ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> { s.execute(() -> {
try { try {
// flush the store before storescanner updates the scanners from store. // flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files, and the memstore will // The current data will be flushed into files, and the memstore will

View File

@ -52,8 +52,8 @@ public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncF
} }
@AfterClass @AfterClass
public static void tearDownAfterClass() { public static void tearDownAfterClass() throws Exception {
GROUP.shutdownGracefully(); GROUP.shutdownGracefully().get();
} }
@Override @Override

View File

@ -604,7 +604,7 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state="
+ event.getState() + ", " + "path=" + event.getPath())); + event.getState() + ", " + "path=" + event.getPath()));
final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier; final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier;
zkEventProcessor.submit(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName));
} }
// Connection management // Connection management