ARTEMIS-208 BrokerInfo issue, also:
enlarged the default max size for tests to avoid send blocking.
This commit is contained in:
parent
7dfde208ca
commit
34e127cc0c
|
@ -270,6 +270,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
|||
|
||||
private void negotiate(WireFormatInfo command) throws IOException {
|
||||
this.wireFormat.renegotiateWireFormat(command);
|
||||
//throw back a brokerInfo here
|
||||
protocolManager.sendBrokerInfo(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1084,7 +1086,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
|||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
catch (Throwable e) {
|
||||
if (e instanceof ActiveMQSecurityException) {
|
||||
resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -67,6 +68,7 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.CommandTypes;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
|
@ -135,6 +137,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
|
||||
private BrokerInfo brokerInfo = new BrokerInfo();
|
||||
|
||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
||||
this.factory = factory;
|
||||
this.server = server;
|
||||
|
@ -148,6 +152,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
if (service != null) {
|
||||
service.addNotificationListener(this);
|
||||
}
|
||||
brokerInfo.setBrokerName(server.getIdentity());
|
||||
brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
|
||||
brokerInfo.setPeerBrokerInfos(null);
|
||||
brokerInfo.setFaultTolerantConfiguration(false);
|
||||
brokerInfo.setBrokerURL(null);
|
||||
|
||||
}
|
||||
|
||||
public ProtocolManagerFactory<Interceptor> getFactory() {
|
||||
|
@ -162,6 +172,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
|
||||
@Override
|
||||
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
|
||||
if (brokerInfo.getBrokerURL() == null) {
|
||||
NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
|
||||
brokerInfo.setBrokerURL(nettyAcceptor.getURL());
|
||||
}
|
||||
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
|
||||
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
|
||||
owConn.init();
|
||||
|
@ -693,4 +707,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
|
||||
server.destroyQueue(subQueueName);
|
||||
}
|
||||
|
||||
public void sendBrokerInfo(OpenWireConnection connection) {
|
||||
BrokerInfo copy = brokerInfo.copy();
|
||||
//cluster support yet to support
|
||||
copy.setPeerBrokerInfos(null);
|
||||
connection.dispatchAsync(copy);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -579,6 +579,10 @@ public class NettyAcceptor implements Acceptor {
|
|||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public String getURL() {
|
||||
return "tcp://" + this.host + ":" + this.port;
|
||||
}
|
||||
// Inner classes -----------------------------------------------------------------------------
|
||||
|
||||
private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {
|
||||
|
|
|
@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
|
|||
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||
}
|
||||
if (entry.isProducerFlowControl()) {
|
||||
settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
|
||||
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
|
|||
settingsMap.put("#", defSettings);
|
||||
}
|
||||
if (defaultEntry.isProducerFlowControl()) {
|
||||
defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
|
||||
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue