From 278568203b9c2033743ecca60dbc62d397a85a8d Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Fri, 4 Mar 2022 22:17:48 +0530 Subject: [PATCH] HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani. --- .../java/org/apache/hadoop/net/NetUtils.java | 26 +++++ .../hdfs/qjournal/MiniJournalCluster.java | 41 +++++++- .../hdfs/qjournal/TestMiniJournalCluster.java | 98 ++++++++++++++++++- 3 files changed, 161 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index 4b924af03c1..fead87d7907 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -1053,6 +1053,32 @@ public static int getFreeSocketPort() { return port; } + /** + * Return free ports. There is no guarantee they will remain free, so + * ports should be used immediately. The number of free ports returned by + * this method should match argument {@code numOfPorts}. Num of ports + * provided in the argument should not exceed 25. + * + * @param numOfPorts Number of free ports to acquire. + * @return Free ports for binding a local socket. + */ + public static Set getFreeSocketPorts(int numOfPorts) { + Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25, + "Valid range for num of ports is between 0 and 26"); + final Set freePorts = new HashSet<>(numOfPorts); + for (int i = 0; i < numOfPorts * 5; i++) { + int port = getFreeSocketPort(); + if (port == 0) { + continue; + } + freePorts.add(port); + if (freePorts.size() == numOfPorts) { + return freePorts; + } + } + throw new IllegalStateException(numOfPorts + " free ports could not be acquired."); + } + /** * Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true * than returns null. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java index d0bbd44f1af..1c43b39159a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; import static org.junit.Assert.fail; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -45,13 +46,16 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.test.GenericTestUtils; -public class MiniJournalCluster { +public final class MiniJournalCluster implements Closeable { + public static final String CLUSTER_WAITACTIVE_URI = "waitactive"; public static class Builder { private String baseDir; private int numJournalNodes = 3; private boolean format = true; private final Configuration conf; + private int[] httpPorts = null; + private int[] rpcPorts = null; static { DefaultMetricsSystem.setMiniClusterMode(true); @@ -76,6 +80,16 @@ public Builder format(boolean f) { return this; } + public Builder setHttpPorts(int... ports) { + this.httpPorts = ports; + return this; + } + + public Builder setRpcPorts(int... ports) { + this.rpcPorts = ports; + return this; + } + public MiniJournalCluster build() throws IOException { return new MiniJournalCluster(this); } @@ -99,6 +113,19 @@ private JNInfo(JournalNode node) { private final JNInfo[] nodes; private MiniJournalCluster(Builder b) throws IOException { + + if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) { + throw new IllegalArgumentException( + "Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes (" + + b.numJournalNodes + ")"); + } + + if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) { + throw new IllegalArgumentException( + "Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes (" + + b.numJournalNodes + ")"); + } + LOG.info("Starting MiniJournalCluster with " + b.numJournalNodes + " journal nodes"); @@ -173,8 +200,10 @@ private Configuration createConfForNode(Builder b, int idx) { Configuration conf = new Configuration(b.conf); File logDir = getStorageDir(idx); conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString()); - conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0"); - conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0"); + int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0; + int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0; + conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort); return conf; } @@ -274,4 +303,10 @@ public void setNamenodeSharedEditsConf(String jid) { .DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString()); } } + + @Override + public void close() throws IOException { + this.shutdown(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java index cace7c92891..ccbbc94c99e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java @@ -22,15 +22,23 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; -import org.junit.Test; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestMiniJournalCluster { + + private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class); + @Test public void testStartStop() throws IOException { Configuration conf = new Configuration(); @@ -52,4 +60,92 @@ public void testStartStop() throws IOException { c.shutdown(); } } + + @Test + public void testStartStopWithPorts() throws Exception { + Configuration conf = new Configuration(); + + LambdaTestUtils.intercept( + IllegalArgumentException.class, + "Num of http ports (1) should match num of JournalNodes (3)", + "MiniJournalCluster port validation failed", + () -> { + new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build(); + }); + + LambdaTestUtils.intercept( + IllegalArgumentException.class, + "Num of rpc ports (2) should match num of JournalNodes (3)", + "MiniJournalCluster port validation failed", + () -> { + new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build(); + }); + + LambdaTestUtils.intercept( + IllegalArgumentException.class, + "Num of rpc ports (1) should match num of JournalNodes (3)", + "MiniJournalCluster port validation failed", + () -> { + new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481) + .build(); + }); + + LambdaTestUtils.intercept( + IllegalArgumentException.class, + "Num of http ports (4) should match num of JournalNodes (3)", + "MiniJournalCluster port validation failed", + () -> { + new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000) + .setRpcPorts(8481, 8482, 8483).build(); + }); + + final Set httpAndRpcPorts = NetUtils.getFreeSocketPorts(6); + LOG.info("Free socket ports: {}", httpAndRpcPorts); + + for (Integer httpAndRpcPort : httpAndRpcPorts) { + assertNotEquals("None of the acquired socket port should not be zero", 0, + httpAndRpcPort.intValue()); + } + + final int[] httpPorts = new int[3]; + final int[] rpcPorts = new int[3]; + int httpPortIdx = 0; + int rpcPortIdx = 0; + for (Integer httpAndRpcPort : httpAndRpcPorts) { + if (httpPortIdx < 3) { + httpPorts[httpPortIdx++] = httpAndRpcPort; + } else { + rpcPorts[rpcPortIdx++] = httpAndRpcPort; + } + } + + LOG.info("Http ports selected: {}", httpPorts); + LOG.info("Rpc ports selected: {}", rpcPorts); + + try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf) + .setHttpPorts(httpPorts) + .setRpcPorts(rpcPorts).build()) { + miniJournalCluster.waitActive(); + URI uri = miniJournalCluster.getQuorumJournalURI("myjournal"); + String[] addrs = uri.getAuthority().split(";"); + assertEquals(3, addrs.length); + + assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort()); + assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort()); + assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort()); + + assertEquals(rpcPorts[0], + miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort()); + assertEquals(rpcPorts[1], + miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort()); + assertEquals(rpcPorts[2], + miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort()); + + JournalNode node = miniJournalCluster.getJournalNode(0); + String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY); + assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(), + dir); + } + } + }