HDFS-16587. Allow configuring the number of JournalNodeRPCServer Handlers (#4339). Contributed by ZanderXu.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
96985f4c45
commit
dc9b21feac
|
@ -1404,6 +1404,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.journalnode.edits.dir.perm";
|
"dfs.journalnode.edits.dir.perm";
|
||||||
public static final String DFS_JOURNAL_EDITS_DIR_PERMISSION_DEFAULT =
|
public static final String DFS_JOURNAL_EDITS_DIR_PERMISSION_DEFAULT =
|
||||||
"700";
|
"700";
|
||||||
|
public static final String DFS_JOURNALNODE_HANDLER_COUNT_KEY =
|
||||||
|
"dfs.journalnode.handler.count";
|
||||||
|
public static final int DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT = 5;
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
|
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
|
||||||
public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
|
public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
|
||||||
|
|
|
@ -55,6 +55,8 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST_KEY;
|
||||||
|
|
||||||
|
|
||||||
|
@ -63,9 +65,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST
|
||||||
public class JournalNodeRpcServer implements QJournalProtocol,
|
public class JournalNodeRpcServer implements QJournalProtocol,
|
||||||
InterQJournalProtocol {
|
InterQJournalProtocol {
|
||||||
private static final Logger LOG = JournalNode.LOG;
|
private static final Logger LOG = JournalNode.LOG;
|
||||||
private static final int HANDLER_COUNT = 5;
|
|
||||||
private final JournalNode jn;
|
private final JournalNode jn;
|
||||||
private Server server;
|
private Server server;
|
||||||
|
private final int handlerCount;
|
||||||
|
|
||||||
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
|
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
|
||||||
this.jn = jn;
|
this.jn = jn;
|
||||||
|
@ -90,13 +92,25 @@ public class JournalNodeRpcServer implements QJournalProtocol,
|
||||||
new QJournalProtocolServerSideTranslatorPB(this);
|
new QJournalProtocolServerSideTranslatorPB(this);
|
||||||
BlockingService service = QJournalProtocolService
|
BlockingService service = QJournalProtocolService
|
||||||
.newReflectiveBlockingService(translator);
|
.newReflectiveBlockingService(translator);
|
||||||
|
int confHandlerCount = conf.getInt(DFS_JOURNALNODE_HANDLER_COUNT_KEY,
|
||||||
|
DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
if (confHandlerCount <= 0) {
|
||||||
|
LOG.warn("Invalid value for: {} = {}, Should be > 0,"
|
||||||
|
+ " will use default value of: {}.",
|
||||||
|
DFS_JOURNALNODE_HANDLER_COUNT_KEY, confHandlerCount,
|
||||||
|
DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
confHandlerCount = DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT;
|
||||||
|
}
|
||||||
|
this.handlerCount = confHandlerCount;
|
||||||
|
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
|
||||||
|
this.handlerCount);
|
||||||
|
|
||||||
this.server = new RPC.Builder(confCopy)
|
this.server = new RPC.Builder(confCopy)
|
||||||
.setProtocol(QJournalProtocolPB.class)
|
.setProtocol(QJournalProtocolPB.class)
|
||||||
.setInstance(service)
|
.setInstance(service)
|
||||||
.setBindAddress(bindHost)
|
.setBindAddress(bindHost)
|
||||||
.setPort(addr.getPort())
|
.setPort(addr.getPort())
|
||||||
.setNumHandlers(HANDLER_COUNT)
|
.setNumHandlers(this.handlerCount)
|
||||||
.setVerbose(false)
|
.setVerbose(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -121,6 +135,11 @@ public class JournalNodeRpcServer implements QJournalProtocol,
|
||||||
this.server.setTracer(jn.tracer);
|
this.server.setTracer(jn.tracer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getHandlerCount() {
|
||||||
|
return this.handlerCount;
|
||||||
|
}
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
this.server.start();
|
this.server.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -6253,6 +6253,15 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.journalnode.handler.count</name>
|
||||||
|
<value>5</value>
|
||||||
|
<description>
|
||||||
|
The number of JournalNode RPC server threads that listen to
|
||||||
|
requests from clients.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.lease-hard-limit-sec</name>
|
<name>dfs.namenode.lease-hard-limit-sec</name>
|
||||||
<value>1200</value>
|
<value>1200</value>
|
||||||
|
|
|
@ -142,6 +142,10 @@ public class TestJournalNode {
|
||||||
"qjournal://journalnode0:9900;journalnode1:9901/" + journalId);
|
"qjournal://journalnode0:9900;journalnode1:9901/" + journalId);
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
|
||||||
"qjournal://journalnode0:9902;journalnode1:9903/" + journalId);
|
"qjournal://journalnode0:9902;journalnode1:9903/" + journalId);
|
||||||
|
} else if (testName.getMethodName().equals("testConfAbnormalHandlerNumber")) {
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, -1);
|
||||||
|
} else if (testName.getMethodName().equals("testConfNormalHandlerNumber")) {
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, 10);
|
||||||
}
|
}
|
||||||
jn = new JournalNode();
|
jn = new JournalNode();
|
||||||
jn.setConf(conf);
|
jn.setConf(conf);
|
||||||
|
@ -672,4 +676,26 @@ public class TestJournalNode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConfNormalHandlerNumber() {
|
||||||
|
int confHandlerNumber = jn.getConf().getInt(
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
assertTrue(confHandlerNumber > 0);
|
||||||
|
int handlerCount = jn.getRpcServer().getHandlerCount();
|
||||||
|
assertEquals(confHandlerNumber, handlerCount);
|
||||||
|
assertEquals(10, handlerCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConfAbnormalHandlerNumber() {
|
||||||
|
int confHandlerCount = jn.getConf().getInt(
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
assertTrue(confHandlerCount <= 0);
|
||||||
|
int handlerCount = jn.getRpcServer().getHandlerCount();
|
||||||
|
assertEquals(
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT,
|
||||||
|
handlerCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue