ARTEMIS-208 fixing BrokerInfo, using OpenWire connection instead of static property on the protocolManager

This commit is contained in:
Clebert Suconic 2015-08-24 22:25:13 -04:00
parent 34e127cc0c
commit 8935483cdd
6 changed files with 34 additions and 17 deletions

View File

@ -298,6 +298,14 @@ public class NettyConnection implements Connection {
return address.toString(); return address.toString();
} }
public String getLocalAddress() {
SocketAddress address = channel.localAddress();
if (address == null) {
return null;
}
return "tcp://" + address.toString();
}
public boolean isDirectDeliver() { public boolean isDirectDeliver() {
return directDeliver; return directDeliver;
} }

View File

@ -90,6 +90,15 @@ public interface Connection {
*/ */
String getRemoteAddress(); String getRemoteAddress();
/**
* Returns a string representation of the local address this connection is connected to.
* This is useful when the server is configured at 0.0.0.0 (or multiple IPs).
* This will give you the actual IP that's being used.
*
* @return the local address
*/
String getLocalAddress();
/** /**
* Called periodically to flush any data in the batch buffer * Called periodically to flush any data in the batch buffer
*/ */

View File

@ -191,6 +191,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
} }
public String getLocalAddress() {
return transportConnection.getLocalAddress();
}
@Override @Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
try { try {

View File

@ -46,7 +46,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -137,7 +136,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
private BrokerInfo brokerInfo = new BrokerInfo();
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory; this.factory = factory;
@ -152,11 +150,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (service != null) { if (service != null) {
service.addNotificationListener(this); service.addNotificationListener(this);
} }
brokerInfo.setBrokerName(server.getIdentity());
brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
brokerInfo.setBrokerURL(null);
} }
@ -172,10 +165,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override @Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
if (brokerInfo.getBrokerURL() == null) {
NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
brokerInfo.setBrokerURL(nettyAcceptor.getURL());
}
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf); OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
owConn.init(); owConn.init();
@ -709,9 +698,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
} }
public void sendBrokerInfo(OpenWireConnection connection) { public void sendBrokerInfo(OpenWireConnection connection) {
BrokerInfo copy = brokerInfo.copy(); BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(server.getIdentity());
brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
brokerInfo.setBrokerURL(connection.getLocalAddress());
//cluster support yet to support //cluster support yet to support
copy.setPeerBrokerInfos(null); brokerInfo.setPeerBrokerInfos(null);
connection.dispatchAsync(copy); connection.dispatchAsync(brokerInfo);
} }
} }

View File

@ -208,6 +208,10 @@ public class InVMConnection implements Connection {
return "invm:" + serverID; return "invm:" + serverID;
} }
public String getLocalAddress() {
return "invm:" + serverID;
}
public int getBatchingBufferSize() { public int getBatchingBufferSize() {
return -1; return -1;
} }

View File

@ -580,9 +580,6 @@ public class NettyAcceptor implements Acceptor {
return sb.toString(); return sb.toString();
} }
public String getURL() {
return "tcp://" + this.host + ":" + this.port;
}
// Inner classes ----------------------------------------------------------------------------- // Inner classes -----------------------------------------------------------------------------
private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator { private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {