HDFS-3898. QJM: enable TCP_NODELAY for IPC. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1cea56d6ef
commit
aa65777ef0
|
@ -52,3 +52,5 @@ HDFS-3726. If a logger misses an RPC, don't retry that logger until next segment
|
|||
HDFS-3893. QJM: Make QJM work with security enabled. (atm)
|
||||
|
||||
HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm)
|
||||
|
||||
HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
|
@ -166,16 +167,26 @@ public class IPCLoggerChannel implements AsyncLogger {
|
|||
}
|
||||
|
||||
protected QJournalProtocol createProxy() throws IOException {
|
||||
final Configuration confCopy = new Configuration(conf);
|
||||
|
||||
// Need to set NODELAY or else batches larger than MTU can trigger
|
||||
// 40ms nagling delays.
|
||||
confCopy.setBoolean(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
|
||||
true);
|
||||
|
||||
RPC.setProtocolEngine(confCopy,
|
||||
QJournalProtocolPB.class, ProtobufRpcEngine.class);
|
||||
return SecurityUtil.doAsLoginUser(
|
||||
new PrivilegedExceptionAction<QJournalProtocol>() {
|
||||
@Override
|
||||
public QJournalProtocol run() throws IOException {
|
||||
RPC.setProtocolEngine(conf,
|
||||
RPC.setProtocolEngine(confCopy,
|
||||
QJournalProtocolPB.class, ProtobufRpcEngine.class);
|
||||
QJournalProtocolPB pbproxy = RPC.getProxy(
|
||||
QJournalProtocolPB.class,
|
||||
RPC.getProtocolVersion(QJournalProtocolPB.class),
|
||||
addr, conf);
|
||||
addr, confCopy);
|
||||
return new QJournalProtocolTranslatorPB(pbproxy);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.URL;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
|
@ -54,8 +55,15 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
|||
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
|
||||
this.jn = jn;
|
||||
|
||||
InetSocketAddress addr = getAddress(conf);
|
||||
RPC.setProtocolEngine(conf, QJournalProtocolPB.class,
|
||||
Configuration confCopy = new Configuration(conf);
|
||||
|
||||
// Ensure that nagling doesn't kick in, which could cause latency issues.
|
||||
confCopy.setBoolean(
|
||||
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
|
||||
true);
|
||||
|
||||
InetSocketAddress addr = getAddress(confCopy);
|
||||
RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
QJournalProtocolServerSideTranslatorPB translator =
|
||||
new QJournalProtocolServerSideTranslatorPB(this);
|
||||
|
@ -65,13 +73,13 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
|||
this.server = RPC.getServer(
|
||||
QJournalProtocolPB.class,
|
||||
service, addr.getHostName(),
|
||||
addr.getPort(), HANDLER_COUNT, false, conf,
|
||||
addr.getPort(), HANDLER_COUNT, false, confCopy,
|
||||
null /*secretManager*/);
|
||||
|
||||
// set service-level authorization security policy
|
||||
if (conf.getBoolean(
|
||||
if (confCopy.getBoolean(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||
server.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
server.refreshServiceAcl(confCopy, new HDFSPolicyProvider());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,15 +23,17 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
||||
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
|
||||
|
@ -49,6 +51,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
|
@ -70,6 +73,12 @@ public class TestJournalNode {
|
|||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
|
||||
File.separator + "TestJournalNode");
|
||||
FileUtil.fullyDelete(editsDir);
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
||||
editsDir.getAbsolutePath());
|
||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
|
||||
"0.0.0.0:0");
|
||||
jn = new JournalNode();
|
||||
|
@ -276,6 +285,39 @@ public class TestJournalNode {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test of how fast the code path is to write edits.
|
||||
* This isn't a true unit test, but can be run manually to
|
||||
* check performance.
|
||||
*
|
||||
* At the time of development, this test ran in ~4sec on an
|
||||
* SSD-enabled laptop (1.8ms/batch).
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testPerformance() throws Exception {
|
||||
doPerfTest(8192, 1024); // 8MB
|
||||
}
|
||||
|
||||
private void doPerfTest(int editsSize, int numEdits) throws Exception {
|
||||
byte[] data = new byte[editsSize];
|
||||
ch.newEpoch(1).get();
|
||||
ch.setEpoch(1);
|
||||
ch.startLogSegment(1).get();
|
||||
|
||||
Stopwatch sw = new Stopwatch().start();
|
||||
for (int i = 1; i < numEdits; i++) {
|
||||
ch.sendEdits(1L, i, 1, data).get();
|
||||
}
|
||||
long time = sw.elapsedMillis();
|
||||
|
||||
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
|
||||
" bytes in " + time + "ms");
|
||||
float avgRtt = (float)time/(float)numEdits;
|
||||
long throughput = ((long)numEdits * editsSize * 1000L)/time;
|
||||
System.err.println("Time per batch: " + avgRtt);
|
||||
System.err.println("Throughput: " + throughput + " bytes/sec");
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// - add test that checks formatting behavior
|
||||
// - add test that checks rejects newEpoch if nsinfo doesn't match
|
||||
|
|
Loading…
Reference in New Issue