mirror of https://github.com/apache/activemq.git
Fix the TopicMasterSlaveTest that was failing
- Stack was overflowing due to the advisory broker advising on topic advisories - MasterConnector now makes sync request to the slave if it's given a sync request - Test was failing due to kaha not being able to create a file that was too long.. fixed by making the sub name and client id and dest name shorter. Need to revisit. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@563854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6fad12ac6
commit
0945e32b6a
|
@ -145,11 +145,12 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
|
|
||||||
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||||
Destination answer = next.addDestination(context, destination);
|
Destination answer = next.addDestination(context, destination);
|
||||||
|
if( !AdvisorySupport.isAdvisoryTopic(destination) ) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||||
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
|
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
|
||||||
fireAdvisory(context, topic, info);
|
fireAdvisory(context, topic, info);
|
||||||
destinations.put(destination, info);
|
destinations.put(destination, info);
|
||||||
|
}
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,9 +158,11 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
ActiveMQDestination destination = info.getDestination();
|
ActiveMQDestination destination = info.getDestination();
|
||||||
next.addDestinationInfo(context, info);
|
next.addDestinationInfo(context, info);
|
||||||
|
|
||||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
if( !AdvisorySupport.isAdvisoryTopic(destination) ) {
|
||||||
fireAdvisory(context, topic, info);
|
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||||
destinations.put(destination, info);
|
fireAdvisory(context, topic, info);
|
||||||
|
destinations.put(destination, info);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
|
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
|
||||||
|
|
|
@ -1367,12 +1367,12 @@ public class BrokerService implements Service {
|
||||||
// Add a filter that will stop access to the broker once stopped
|
// Add a filter that will stop access to the broker once stopped
|
||||||
broker = new MutableBrokerFilter(broker) {
|
broker = new MutableBrokerFilter(broker) {
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
super.stop();
|
|
||||||
setNext(new ErrorBroker("Broker has been stopped: "+this) {
|
setNext(new ErrorBroker("Broker has been stopped: "+this) {
|
||||||
// Just ignore additional stop actions.
|
// Just ignore additional stop actions.
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
super.stop();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -222,11 +222,12 @@ public class MasterConnector implements Service,BrokerServiceAware{
|
||||||
}else{
|
}else{
|
||||||
boolean responseRequired=command.isResponseRequired();
|
boolean responseRequired=command.isResponseRequired();
|
||||||
int commandId=command.getCommandId();
|
int commandId=command.getCommandId();
|
||||||
localBroker.oneway(command);
|
|
||||||
if(responseRequired){
|
if(responseRequired){
|
||||||
Response response=new Response();
|
Response response = (Response)localBroker.request(command);
|
||||||
response.setCorrelationId(commandId);
|
response.setCorrelationId(commandId);
|
||||||
remoteBroker.oneway(response);
|
remoteBroker.oneway(response);
|
||||||
|
} else {
|
||||||
|
localBroker.oneway(command);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}catch(IOException e){
|
}catch(IOException e){
|
||||||
|
|
|
@ -126,8 +126,8 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
|
||||||
if (data.length != copyOfMessages.size()) {
|
if (data.length != copyOfMessages.size()) {
|
||||||
for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
|
for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) {
|
||||||
TextMessage message = (TextMessage) iter.next();
|
TextMessage message = (TextMessage) iter.next();
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isInfoEnabled()) {
|
||||||
log.info("<== " + counter++ + " = " + message);
|
log.info("<== " + counter++ + " = " + message.getText());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ public class TestSupport extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getSubject() {
|
protected String getSubject() {
|
||||||
return getClass().getName() + "." + getName();
|
return getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -34,12 +34,12 @@ public class TopicMasterSlaveTest extends QueueMasterSlaveTest{
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MessageConsumer createConsumer(Session session,Destination dest) throws JMSException{
|
protected MessageConsumer createConsumer(Session session,Destination dest) throws JMSException{
|
||||||
return session.createDurableSubscriber((Topic) dest,dest.toString());
|
return session.createDurableSubscriber((Topic) dest,"subName");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connection createReceiveConnection() throws Exception{
|
protected Connection createReceiveConnection() throws Exception{
|
||||||
Connection result=super.createReceiveConnection();
|
Connection result=super.createReceiveConnection();
|
||||||
result.setClientID(getClass().getName());
|
result.setClientID("clientId");
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue