This closes #135 openwire changes on BrokerInfo

This commit is contained in:
Clebert Suconic 2015-08-24 23:04:14 -04:00
commit 54d9a3e9bc
7 changed files with 47 additions and 3 deletions

View File

@ -298,6 +298,14 @@ public class NettyConnection implements Connection {
return address.toString();
}
public String getLocalAddress() {
SocketAddress address = channel.localAddress();
if (address == null) {
return null;
}
return "tcp://" + address.toString();
}
public boolean isDirectDeliver() {
return directDeliver;
}

View File

@ -90,6 +90,15 @@ public interface Connection {
*/
String getRemoteAddress();
/**
* Returns a string representation of the local address this connection is connected to.
* This is useful when the server is configured at 0.0.0.0 (or multiple IPs).
* This will give you the actual IP that's being used.
*
* @return the local address
*/
String getLocalAddress();
/**
* Called periodically to flush any data in the batch buffer
*/

View File

@ -191,6 +191,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
this.creationTime = System.currentTimeMillis();
}
public String getLocalAddress() {
return transportConnection.getLocalAddress();
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
try {
@ -270,6 +274,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
private void negotiate(WireFormatInfo command) throws IOException {
this.wireFormat.renegotiateWireFormat(command);
//throw back a brokerInfo here
protocolManager.sendBrokerInfo(this);
}
@Override
@ -1084,7 +1090,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
}
}
}
catch (Exception e) {
catch (Throwable e) {
if (e instanceof ActiveMQSecurityException) {
resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
}

View File

@ -67,6 +67,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
@ -135,6 +136,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private final ScheduledExecutorService scheduledPool;
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@ -148,6 +150,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (service != null) {
service.addNotificationListener(this);
}
}
public ProtocolManagerFactory<Interceptor> getFactory() {
@ -693,4 +696,17 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
}
public void sendBrokerInfo(OpenWireConnection connection) {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(server.getIdentity());
brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
brokerInfo.setBrokerURL(connection.getLocalAddress());
//cluster support yet to support
brokerInfo.setPeerBrokerInfos(null);
connection.dispatchAsync(brokerInfo);
}
}

View File

@ -208,6 +208,10 @@ public class InVMConnection implements Connection {
return "invm:" + serverID;
}
public String getLocalAddress() {
return "invm:" + serverID;
}
public int getBatchingBufferSize() {
return -1;
}

View File

@ -579,6 +579,7 @@ public class NettyAcceptor implements Acceptor {
}
return sb.toString();
}
// Inner classes -----------------------------------------------------------------------------
private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {

View File

@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
}
if (entry.isProducerFlowControl()) {
settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}
@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
settingsMap.put("#", defSettings);
}
if (defaultEntry.isProducerFlowControl()) {
defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}