diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 6702723bd02..065a21edfbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -52,3 +52,5 @@ HDFS-3726. If a logger misses an RPC, don't retry that logger until next segment HDFS-3893. QJM: Make QJM work with security enabled. (atm) HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm) + +HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index a339f4c90af..da6d6834e45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.PBHelper; @@ -166,16 +167,26 @@ public class IPCLoggerChannel implements AsyncLogger { } protected QJournalProtocol createProxy() throws IOException { + final Configuration confCopy = new Configuration(conf); + + // Need to set NODELAY or else batches larger than MTU can trigger + // 40ms nagling delays. + confCopy.setBoolean( + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, + true); + + RPC.setProtocolEngine(confCopy, + QJournalProtocolPB.class, ProtobufRpcEngine.class); return SecurityUtil.doAsLoginUser( new PrivilegedExceptionAction() { @Override public QJournalProtocol run() throws IOException { - RPC.setProtocolEngine(conf, + RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolPB pbproxy = RPC.getProxy( QJournalProtocolPB.class, RPC.getProtocolVersion(QJournalProtocolPB.class), - addr, conf); + addr, confCopy); return new QJournalProtocolTranslatorPB(pbproxy); } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 10eb00ce75a..79d10d22d1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -23,6 +23,7 @@ import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.protocolPB.PBHelper; @@ -54,8 +55,15 @@ class JournalNodeRpcServer implements QJournalProtocol { JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { this.jn = jn; - InetSocketAddress addr = getAddress(conf); - RPC.setProtocolEngine(conf, QJournalProtocolPB.class, + Configuration confCopy = new Configuration(conf); + + // Ensure that nagling doesn't kick in, which could cause latency issues. + confCopy.setBoolean( + CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, + true); + + InetSocketAddress addr = getAddress(confCopy); + RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolServerSideTranslatorPB translator = new QJournalProtocolServerSideTranslatorPB(this); @@ -65,13 +73,13 @@ class JournalNodeRpcServer implements QJournalProtocol { this.server = RPC.getServer( QJournalProtocolPB.class, service, addr.getHostName(), - addr.getPort(), HANDLER_COUNT, false, conf, + addr.getPort(), HANDLER_COUNT, false, confCopy, null /*secretManager*/); // set service-level authorization security policy - if (conf.getBoolean( + if (confCopy.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { - server.refreshServiceAcl(conf, new HDFSPolicyProvider()); + server.refreshServiceAcl(confCopy, new HDFSPolicyProvider()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 78e0daa31e2..9dafe3898f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -23,15 +23,17 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; +import java.io.File; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; @@ -49,6 +51,7 @@ import org.junit.Before; import org.junit.Test; import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; import com.google.common.primitives.Bytes; import com.google.common.primitives.Ints; @@ -70,6 +73,12 @@ public class TestJournalNode { @Before public void setup() throws Exception { + File editsDir = new File(MiniDFSCluster.getBaseDirectory() + + File.separator + "TestJournalNode"); + FileUtil.fullyDelete(editsDir); + + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + editsDir.getAbsolutePath()); conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); jn = new JournalNode(); @@ -276,6 +285,39 @@ public class TestJournalNode { } } + /** + * Simple test of how fast the code path is to write edits. + * This isn't a true unit test, but can be run manually to + * check performance. + * + * At the time of development, this test ran in ~4sec on an + * SSD-enabled laptop (1.8ms/batch). + */ + @Test(timeout=100000) + public void testPerformance() throws Exception { + doPerfTest(8192, 1024); // 8MB + } + + private void doPerfTest(int editsSize, int numEdits) throws Exception { + byte[] data = new byte[editsSize]; + ch.newEpoch(1).get(); + ch.setEpoch(1); + ch.startLogSegment(1).get(); + + Stopwatch sw = new Stopwatch().start(); + for (int i = 1; i < numEdits; i++) { + ch.sendEdits(1L, i, 1, data).get(); + } + long time = sw.elapsedMillis(); + + System.err.println("Wrote " + numEdits + " batches of " + editsSize + + " bytes in " + time + "ms"); + float avgRtt = (float)time/(float)numEdits; + long throughput = ((long)numEdits * editsSize * 1000L)/time; + System.err.println("Time per batch: " + avgRtt); + System.err.println("Throughput: " + throughput + " bytes/sec"); + } + // TODO: // - add test that checks formatting behavior // - add test that checks rejects newEpoch if nsinfo doesn't match