From 34e127cc0ce1f53c354a9d08b3e0daa747b14f3e Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 24 Aug 2015 10:28:25 +0800 Subject: [PATCH] ARTEMIS-208 BrokerInfo issue, also: enlarged the default max size for tests to avoid send blocking. --- .../protocol/openwire/OpenWireConnection.java | 4 +++- .../openwire/OpenWireProtocolManager.java | 21 +++++++++++++++++++ .../remoting/impl/netty/NettyAcceptor.java | 4 ++++ .../artemiswrapper/ArtemisBrokerWrapper.java | 4 ++-- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 30ffb06356..3cddb2959f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -270,6 +270,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { private void negotiate(WireFormatInfo command) throws IOException { this.wireFormat.renegotiateWireFormat(command); + //throw back a brokerInfo here + protocolManager.sendBrokerInfo(this); } @Override @@ -1084,7 +1086,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { } } } - catch (Exception e) { + catch (Throwable e) { if (e instanceof ActiveMQSecurityException) { resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8c20c466cd..5489fdf0ed 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -67,6 +68,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionId; @@ -135,6 +137,8 @@ public class OpenWireProtocolManager implements ProtocolManager, No private final ScheduledExecutorService scheduledPool; + private BrokerInfo brokerInfo = new BrokerInfo(); + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -148,6 +152,12 @@ public class OpenWireProtocolManager implements ProtocolManager, No if (service != null) { service.addNotificationListener(this); } + brokerInfo.setBrokerName(server.getIdentity()); + brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString())); + brokerInfo.setPeerBrokerInfos(null); + brokerInfo.setFaultTolerantConfiguration(false); + brokerInfo.setBrokerURL(null); + } public ProtocolManagerFactory getFactory() { @@ -162,6 +172,10 @@ public class OpenWireProtocolManager implements ProtocolManager, No @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { + if (brokerInfo.getBrokerURL() == null) { + NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed; + brokerInfo.setBrokerURL(nettyAcceptor.getURL()); + } OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf); owConn.init(); @@ -693,4 +707,11 @@ public class OpenWireProtocolManager implements ProtocolManager, No SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); server.destroyQueue(subQueueName); } + + public void sendBrokerInfo(OpenWireConnection connection) { + BrokerInfo copy = brokerInfo.copy(); + //cluster support yet to support + copy.setPeerBrokerInfos(null); + connection.dispatchAsync(copy); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index f0fafaf59e..a53b8868e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -579,6 +579,10 @@ public class NettyAcceptor implements Acceptor { } return sb.toString(); } + + public String getURL() { + return "tcp://" + this.host + ":" + this.port; + } // Inner classes ----------------------------------------------------------------------------- private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator { diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index bf92a9cbed..723529f109 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); } if (entry.isProducerFlowControl()) { - settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); if (bservice.getSystemUsage().isSendFailIfNoSpace()) { settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); } @@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { settingsMap.put("#", defSettings); } if (defaultEntry.isProducerFlowControl()) { - defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); if (bservice.getSystemUsage().isSendFailIfNoSpace()) { defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); }