HBASE-15495 Connection leak in FanOutOneBlockAsyncDFSOutputHelper

This commit is contained in:
zhangduo 2016-03-22 10:33:03 +08:00
parent 12f66e3060
commit 5fcadb86ab
2 changed files with 114 additions and 46 deletions

View File

@ -99,6 +99,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
/** /**
@ -594,13 +595,15 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
beginFileLease(client, src, stat.getFileId()); beginFileLease(client, src, stat.getFileId());
boolean succ = false; boolean succ = false;
LocatedBlock locatedBlock = null; LocatedBlock locatedBlock = null;
List<Channel> datanodeList = new ArrayList<>(); List<Future<Channel>> futureList = null;
try { try {
DataChecksum summer = createChecksum(client); DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(),
null); null);
for (Future<Channel> future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, List<Channel> datanodeList = new ArrayList<>();
PIPELINE_SETUP_CREATE, summer, eventLoop)) { futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
summer, eventLoop);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper // fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed. // layer should retry itself if needed.
datanodeList.add(future.syncUninterruptibly().getNow()); datanodeList.add(future.syncUninterruptibly().getNow());
@ -610,8 +613,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
} finally { } finally {
if (!succ) { if (!succ) {
for (Channel c : datanodeList) { if (futureList != null) {
c.close(); for (Future<Channel> f : futureList) {
f.addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
future.getNow().close();
}
}
});
}
} }
endFileLease(client, src, stat.getFileId()); endFileLease(client, src, stat.getFileId());
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));

View File

@ -22,24 +22,26 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.Level; import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Logger;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
@ -47,9 +49,15 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@Category({ MiscTests.class, MediumTests.class }) @Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput { public class TestFanOutOneBlockAsyncDFSOutput {
private static final Log LOG = LogFactory.getLog(TestFanOutOneBlockAsyncDFSOutput.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static DistributedFileSystem FS; private static DistributedFileSystem FS;
@ -63,8 +71,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG);
Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG);
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
TEST_UTIL.startMiniDFSCluster(3); TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem(); FS = TEST_UTIL.getDFSCluster().getFileSystem();
@ -79,12 +85,28 @@ public class TestFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.shutdownMiniDFSCluster(); TEST_UTIL.shutdownMiniDFSCluster();
} }
private void ensureAllDatanodeAlive() throws InterruptedException {
// FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we
// can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests
// will fail.
for (;;) {
try {
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
EVENT_LOOP_GROUP.next());
out.close();
break;
} catch (IOException e) {
Thread.sleep(100);
}
}
}
private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out) private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10]; final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler = final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() { eventLoop.execute(new Runnable() {
@Override @Override
@ -107,9 +129,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException { public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FS.getDefaultBlockSize(), eventLoop);
writeAndVerify(eventLoop, f, out); writeAndVerify(eventLoop, f, out);
} }
@ -117,13 +138,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testRecover() throws IOException, InterruptedException, ExecutionException { public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FS.getDefaultBlockSize(), eventLoop);
final byte[] b = new byte[10]; final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler = final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() { eventLoop.execute(new Runnable() {
@Override @Override
@ -135,38 +154,41 @@ public class TestFanOutOneBlockAsyncDFSOutput {
handler.get(); handler.get();
// restart one datanode which causes one connection broken // restart one datanode which causes one connection broken
TEST_UTIL.getDFSCluster().restartDataNode(0); TEST_UTIL.getDFSCluster().restartDataNode(0);
handler.reset();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b, 0, b.length);
out.flush(null, handler, false);
}
});
try { try {
handler.get(); handler.reset();
fail("flush should fail"); eventLoop.execute(new Runnable() {
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail @Override
e.printStackTrace(); public void run() {
out.write(b, 0, b.length);
out.flush(null, handler, false);
}
});
try {
handler.get();
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
LOG.info("expected exception caught", e);
}
out.recoverAndClose(null);
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
} finally {
ensureAllDatanodeAlive();
} }
out.recoverAndClose(null);
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
} }
@Test @Test
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FS.getDefaultBlockSize(), eventLoop);
Thread.sleep(READ_TIMEOUT_MS * 2); Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive. // the connection to datanode should still alive.
writeAndVerify(eventLoop, f, out); writeAndVerify(eventLoop, f, out);
@ -184,7 +206,40 @@ public class TestFanOutOneBlockAsyncDFSOutput {
FS.getDefaultBlockSize(), eventLoop); FS.getDefaultBlockSize(), eventLoop);
fail("should fail with parent does not exist"); fail("should fail with parent does not exist");
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.info("expected exception caught", e);
assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
} }
} }
@Test
public void testConnectToDatanodeFailed()
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException, InterruptedException, NoSuchFieldException {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass = Class
.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true);
try {
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop);
fail("should fail with connection error");
} catch (IOException e) {
LOG.info("expected exception caught", e);
}
for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
}
} finally {
TEST_UTIL.getDFSCluster().restartDataNode(0);
ensureAllDatanodeAlive();
}
}
} }