HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.
(cherry picked from commit 278568203b
)
This commit is contained in:
parent
0e621c890d
commit
a6fb77f7eb
|
@ -1035,6 +1035,32 @@ public class NetUtils {
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return free ports. There is no guarantee they will remain free, so
|
||||||
|
* ports should be used immediately. The number of free ports returned by
|
||||||
|
* this method should match argument {@code numOfPorts}. Num of ports
|
||||||
|
* provided in the argument should not exceed 25.
|
||||||
|
*
|
||||||
|
* @param numOfPorts Number of free ports to acquire.
|
||||||
|
* @return Free ports for binding a local socket.
|
||||||
|
*/
|
||||||
|
public static Set<Integer> getFreeSocketPorts(int numOfPorts) {
|
||||||
|
Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25,
|
||||||
|
"Valid range for num of ports is between 0 and 26");
|
||||||
|
final Set<Integer> freePorts = new HashSet<>(numOfPorts);
|
||||||
|
for (int i = 0; i < numOfPorts * 5; i++) {
|
||||||
|
int port = getFreeSocketPort();
|
||||||
|
if (port == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
freePorts.add(port);
|
||||||
|
if (freePorts.size() == numOfPorts) {
|
||||||
|
return freePorts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException(numOfPorts + " free ports could not be acquired.");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
|
* Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
|
||||||
* than returns null.
|
* than returns null.
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal;
|
||||||
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -44,13 +45,16 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
public class MiniJournalCluster {
|
public final class MiniJournalCluster implements Closeable {
|
||||||
|
|
||||||
public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
|
public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private String baseDir;
|
private String baseDir;
|
||||||
private int numJournalNodes = 3;
|
private int numJournalNodes = 3;
|
||||||
private boolean format = true;
|
private boolean format = true;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
private int[] httpPorts = null;
|
||||||
|
private int[] rpcPorts = null;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
DefaultMetricsSystem.setMiniClusterMode(true);
|
DefaultMetricsSystem.setMiniClusterMode(true);
|
||||||
|
@ -75,6 +79,16 @@ public class MiniJournalCluster {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setHttpPorts(int... ports) {
|
||||||
|
this.httpPorts = ports;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setRpcPorts(int... ports) {
|
||||||
|
this.rpcPorts = ports;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public MiniJournalCluster build() throws IOException {
|
public MiniJournalCluster build() throws IOException {
|
||||||
return new MiniJournalCluster(this);
|
return new MiniJournalCluster(this);
|
||||||
}
|
}
|
||||||
|
@ -98,6 +112,19 @@ public class MiniJournalCluster {
|
||||||
private final JNInfo[] nodes;
|
private final JNInfo[] nodes;
|
||||||
|
|
||||||
private MiniJournalCluster(Builder b) throws IOException {
|
private MiniJournalCluster(Builder b) throws IOException {
|
||||||
|
|
||||||
|
if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
|
||||||
|
+ b.numJournalNodes + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
|
||||||
|
+ b.numJournalNodes + ")");
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Starting MiniJournalCluster with " +
|
LOG.info("Starting MiniJournalCluster with " +
|
||||||
b.numJournalNodes + " journal nodes");
|
b.numJournalNodes + " journal nodes");
|
||||||
|
|
||||||
|
@ -172,8 +199,10 @@ public class MiniJournalCluster {
|
||||||
Configuration conf = new Configuration(b.conf);
|
Configuration conf = new Configuration(b.conf);
|
||||||
File logDir = getStorageDir(idx);
|
File logDir = getStorageDir(idx);
|
||||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
|
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
|
||||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
|
int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
|
||||||
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
|
int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
|
||||||
|
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
|
||||||
|
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,4 +302,10 @@ public class MiniJournalCluster {
|
||||||
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
|
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
this.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,15 +22,23 @@ import static org.junit.Assert.*;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||||
import org.junit.Test;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class TestMiniJournalCluster {
|
public class TestMiniJournalCluster {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStartStop() throws IOException {
|
public void testStartStop() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -52,4 +60,92 @@ public class TestMiniJournalCluster {
|
||||||
c.shutdown();
|
c.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartStopWithPorts() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of http ports (1) should match num of JournalNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of rpc ports (2) should match num of JournalNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of rpc ports (1) should match num of JournalNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
|
||||||
|
.build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of http ports (4) should match num of JournalNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
|
||||||
|
.setRpcPorts(8481, 8482, 8483).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
final Set<Integer> httpAndRpcPorts = NetUtils.getFreeSocketPorts(6);
|
||||||
|
LOG.info("Free socket ports: {}", httpAndRpcPorts);
|
||||||
|
|
||||||
|
for (Integer httpAndRpcPort : httpAndRpcPorts) {
|
||||||
|
assertNotEquals("None of the acquired socket port should not be zero", 0,
|
||||||
|
httpAndRpcPort.intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
final int[] httpPorts = new int[3];
|
||||||
|
final int[] rpcPorts = new int[3];
|
||||||
|
int httpPortIdx = 0;
|
||||||
|
int rpcPortIdx = 0;
|
||||||
|
for (Integer httpAndRpcPort : httpAndRpcPorts) {
|
||||||
|
if (httpPortIdx < 3) {
|
||||||
|
httpPorts[httpPortIdx++] = httpAndRpcPort;
|
||||||
|
} else {
|
||||||
|
rpcPorts[rpcPortIdx++] = httpAndRpcPort;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Http ports selected: {}", httpPorts);
|
||||||
|
LOG.info("Rpc ports selected: {}", rpcPorts);
|
||||||
|
|
||||||
|
try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf)
|
||||||
|
.setHttpPorts(httpPorts)
|
||||||
|
.setRpcPorts(rpcPorts).build()) {
|
||||||
|
miniJournalCluster.waitActive();
|
||||||
|
URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
|
||||||
|
String[] addrs = uri.getAuthority().split(";");
|
||||||
|
assertEquals(3, addrs.length);
|
||||||
|
|
||||||
|
assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
|
||||||
|
assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
|
||||||
|
assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());
|
||||||
|
|
||||||
|
assertEquals(rpcPorts[0],
|
||||||
|
miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
|
||||||
|
assertEquals(rpcPorts[1],
|
||||||
|
miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
|
||||||
|
assertEquals(rpcPorts[2],
|
||||||
|
miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());
|
||||||
|
|
||||||
|
JournalNode node = miniJournalCluster.getJournalNode(0);
|
||||||
|
String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
|
||||||
|
assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
|
||||||
|
dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue