This commit is contained in:
Robbie Gemmell 2022-03-04 13:54:12 +00:00
commit e0ca92d783
4 changed files with 50 additions and 4 deletions

View File

@ -241,6 +241,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private volatile Object providerAgnosticSslContext;
private volatile int actualPort = 0;
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@ -529,12 +531,12 @@ public class NettyAcceptor extends AbstractAcceptor {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host));
props.putIntProperty(new SimpleString("port"), port);
props.putIntProperty(new SimpleString("port"), actualPort);
Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props);
notificationService.sendNotification(notification);
}
ActiveMQServerLogger.LOGGER.startedAcceptor(acceptorType, host, port, protocolsString);
ActiveMQServerLogger.LOGGER.startedAcceptor(acceptorType, host, actualPort, protocolsString);
}
if (batchDelay > 0) {
@ -715,6 +717,13 @@ public class NettyAcceptor extends AbstractAcceptor {
Channel serverChannel = null;
try {
serverChannel = bootstrap.bind(address).syncUninterruptibly().channel();
// The port may be configured as `0` which means the JVM will select an ephemeral port
if (serverChannel.localAddress() instanceof InetSocketAddress) {
actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort();
} else {
actualPort = port;
}
} catch (Exception e) {
throw ActiveMQMessageBundle.BUNDLE.failedToBind(getName(), h + ":" + port, e);
}
@ -1044,4 +1053,9 @@ public class NettyAcceptor extends AbstractAcceptor {
public boolean isAutoStart() {
return autoStart;
}
@Override
public int getActualPort() {
return actualPort;
}
}

View File

@ -83,4 +83,15 @@ public interface Acceptor extends ActiveMQComponent {
default ProtocolHandler getProtocolHandler() {
return null;
}
/**
* This is a utility method for Socket-based acceptor implementations to get the actual port used.
* This is useful for configurations which specify a port number of 0 which allows the JVM to select
* an ephemeral port.
*
* @return the actual port used if using a Socket-based acceptor implementation; -1 otherwise
*/
default int getActualPort() {
return -1;
}
}

View File

@ -22,8 +22,6 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
@ -88,6 +86,8 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;

View File

@ -34,8 +34,10 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.PortCheckRule;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -123,4 +125,23 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
assertTrue(server.getRemotingService().getAcceptor("start").isStarted());
assertFalse(server.getRemotingService().getAcceptor("noStart").isStarted());
}
@Test
public void testActualPort() throws Exception {
String firstPort0 = RandomUtil.randomString();
String secondPort0 = RandomUtil.randomString();
String normal = RandomUtil.randomString();
String invm = RandomUtil.randomString();
ActiveMQServer server = createServer(false, createDefaultInVMConfig());
server.getConfiguration().addAcceptorConfiguration(firstPort0, "tcp://127.0.0.1:0");
server.getConfiguration().addAcceptorConfiguration(secondPort0, "tcp://127.0.0.1:0");
server.getConfiguration().addAcceptorConfiguration(normal, "tcp://127.0.0.1:61616");
server.getConfiguration().addAcceptorConfiguration(invm, "vm://1");
server.start();
Wait.assertTrue(() -> server.getRemotingService().getAcceptor(firstPort0).getActualPort() > 0);
Wait.assertTrue(() -> server.getRemotingService().getAcceptor(secondPort0).getActualPort() > 0);
Wait.assertTrue(() -> server.getRemotingService().getAcceptor(firstPort0).getActualPort() != server.getRemotingService().getAcceptor(secondPort0).getActualPort());
Wait.assertEquals(61616, () -> server.getRemotingService().getAcceptor(normal).getActualPort());
Wait.assertEquals(-1, () -> server.getRemotingService().getAcceptor(invm).getActualPort());
}
}