From 5fcadb86ab9981b69fba92e590cffc028e2b78b3 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 22 Mar 2016 10:33:03 +0800 Subject: [PATCH] HBASE-15495 Connection leak in FanOutOneBlockAsyncDFSOutputHelper --- .../FanOutOneBlockAsyncDFSOutputHelper.java | 23 ++- .../TestFanOutOneBlockAsyncDFSOutput.java | 137 ++++++++++++------ 2 files changed, 114 insertions(+), 46 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java index d34bbb05603..ea71701a7de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -99,6 +99,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; /** @@ -594,13 +595,15 @@ public class FanOutOneBlockAsyncDFSOutputHelper { beginFileLease(client, src, stat.getFileId()); boolean succ = false; LocatedBlock locatedBlock = null; - List datanodeList = new ArrayList<>(); + List> futureList = null; try { DataChecksum summer = createChecksum(client); locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); - for (Future future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoop)) { + List datanodeList = new ArrayList<>(); + futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, + summer, eventLoop); + for (Future future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); @@ -610,8 +613,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); } finally { if (!succ) { - for (Channel c : datanodeList) { - c.close(); + if (futureList != null) { + for (Future f : futureList) { + f.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + future.getNow().close(); + } + } + }); + } } endFileLease(client, src, stat.getFileId()); fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java index 0e9f42e410b..09cd61e77db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java @@ -22,24 +22,26 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.apache.hadoop.util.Daemon; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -47,9 +49,15 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput { + private static final Log LOG = LogFactory.getLog(TestFanOutOneBlockAsyncDFSOutput.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static DistributedFileSystem FS; @@ -63,8 +71,6 @@ public class TestFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUp() throws Exception { - Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG); - Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG); TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); @@ -79,12 +85,28 @@ public class TestFanOutOneBlockAsyncDFSOutput { TEST_UTIL.shutdownMiniDFSCluster(); } + private void ensureAllDatanodeAlive() throws InterruptedException { + // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we + // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests + // will fail. + for (;;) { + try { + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, + new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), + EVENT_LOOP_GROUP.next()); + out.close(); + break; + } catch (IOException e) { + Thread.sleep(100); + } + } + } + private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out) throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -107,9 +129,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); writeAndVerify(eventLoop, f, out); } @@ -117,13 +138,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -135,38 +154,41 @@ public class TestFanOutOneBlockAsyncDFSOutput { handler.get(); // restart one datanode which causes one connection broken TEST_UTIL.getDFSCluster().restartDataNode(0); - handler.reset(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - out.write(b, 0, b.length); - out.flush(null, handler, false); - } - }); try { - handler.get(); - fail("flush should fail"); - } catch (ExecutionException e) { - // we restarted one datanode so the flush should fail - e.printStackTrace(); + handler.reset(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + try { + handler.get(); + fail("flush should fail"); + } catch (ExecutionException e) { + // we restarted one datanode so the flush should fail + LOG.info("expected exception caught", e); + } + out.recoverAndClose(null); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } finally { + ensureAllDatanodeAlive(); } - out.recoverAndClose(null); - assertEquals(b.length, FS.getFileStatus(f).getLen()); - byte[] actual = new byte[b.length]; - try (FSDataInputStream in = FS.open(f)) { - in.readFully(actual); - } - assertArrayEquals(b, actual); } @Test public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(eventLoop, f, out); @@ -184,7 +206,40 @@ public class TestFanOutOneBlockAsyncDFSOutput { FS.getDefaultBlockSize(), eventLoop); fail("should fail with parent does not exist"); } catch (RemoteException e) { + LOG.info("expected exception caught", e); assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); } } + + @Test + public void testConnectToDatanodeFailed() + throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, InterruptedException, NoSuchFieldException { + Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); + xceiverServerDaemonField.setAccessible(true); + Class xceiverServerClass = Class + .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); + numPeersMethod.setAccessible(true); + // make one datanode broken + TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true); + try { + Path f = new Path("/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try { + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + fail("should fail with connection error"); + } catch (IOException e) { + LOG.info("expected exception caught", e); + } + for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) { + Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn); + assertEquals(0, numPeersMethod.invoke(daemon.getRunnable())); + } + } finally { + TEST_UTIL.getDFSCluster().restartDataNode(0); + ensureAllDatanodeAlive(); + } + } }