mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/activemq/browse/AMQ-2224 and additional fix for https://issues.apache.org/activemq/browse/AMQ-2221
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@768300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
82d7182939
commit
a49ba6c040
|
@ -46,8 +46,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
*/
|
||||
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TimeStampingBrokerPlugin.class);
|
||||
|
||||
/**
|
||||
* variable which (when non-zero) is used to override
|
||||
* the expiration date for messages that arrive with
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* The TraceBrokerPathPlugin can be used in a network of Brokers. Each Broker
|
||||
* that has the plugin configured, will add it's brokerName to the content
|
||||
* of a JMS Property. If all Brokers have this property enabled, the path the
|
||||
* message actually took through the network can be seen in the defined property.
|
||||
*
|
||||
* @org.apache.xbean.XBean element="traceBrokerPathPlugin"
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
|
||||
public class TraceBrokerPathPlugin extends BrokerPluginSupport {
|
||||
|
||||
private String stampProperty = "BrokerPath";
|
||||
private static final Log LOG = LogFactory.getLog(TraceBrokerPathPlugin.class);
|
||||
|
||||
public String getStampProperty() {
|
||||
return stampProperty;
|
||||
}
|
||||
|
||||
public void setStampProperty(String stampProperty) {
|
||||
this.stampProperty = stampProperty;
|
||||
}
|
||||
|
||||
public void preProcessDispatch(MessageDispatch messageDispatch) {
|
||||
try {
|
||||
String brokerStamp = (String)messageDispatch.getMessage().getProperty(getStampProperty());
|
||||
if (brokerStamp == null) {
|
||||
brokerStamp = getBrokerName();
|
||||
} else {
|
||||
brokerStamp += "," + getBrokerName();
|
||||
}
|
||||
messageDispatch.getMessage().setProperty(getStampProperty(), brokerStamp);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Setting broker property failed " + ioe, ioe);
|
||||
}
|
||||
super.preProcessDispatch(messageDispatch);
|
||||
}
|
||||
}
|
|
@ -245,6 +245,8 @@ timedSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.Timed
|
|||
topic = org.apache.activemq.command.ActiveMQTopic
|
||||
org.apache.activemq.command.ActiveMQTopic(java.lang.String).parameterNames = name
|
||||
|
||||
traceBrokerPathPlugin = org.apache.activemq.broker.util.TraceBrokerPathPlugin
|
||||
|
||||
transportConnector = org.apache.activemq.broker.TransportConnector
|
||||
org.apache.activemq.broker.TransportConnector(org.apache.activemq.transport.TransportServer).parameterNames = server
|
||||
|
||||
|
|
|
@ -584,6 +584,7 @@ authentication or authorization
|
|||
<xs:element ref='tns:simpleAuthenticationPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -614,6 +615,7 @@ other brokers in a federated network
|
|||
<xs:element ref='tns:multicastTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -3133,6 +3135,7 @@ in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
|
|||
<xs:element ref='tns:multicastTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -3150,6 +3153,43 @@ in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
|
|||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
<!-- element for type: org.apache.activemq.broker.util.TraceBrokerPathPlugin -->
|
||||
<xs:element name='traceBrokerPathPlugin'>
|
||||
<xs:annotation>
|
||||
<xs:documentation><![CDATA[
|
||||
The TraceBrokerPathPlugin can be used in a network of Brokers. Each Broker
|
||||
that has the plugin configured, will add it's brokerName to the content
|
||||
of a JMS Property. If all Brokers have this property enabled, the path the
|
||||
message actually took through the network can be seen in the defined property.
|
||||
]]></xs:documentation>
|
||||
</xs:annotation>
|
||||
<xs:complexType>
|
||||
<xs:sequence>
|
||||
<xs:element name='next' minOccurs='0' maxOccurs='1'>
|
||||
<xs:complexType>
|
||||
<xs:choice minOccurs='0' maxOccurs='1'>
|
||||
<xs:element ref='tns:loggingBrokerPlugin' />
|
||||
<xs:element
|
||||
ref='tns:multicastTraceBrokerPlugin' />
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin' />
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin' />
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other' />
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:any namespace='##other' minOccurs='0'
|
||||
maxOccurs='unbounded' />
|
||||
<xs:element name="stampProperty" type="xs:string"
|
||||
maxOccurs="1" minOccurs="0">
|
||||
</xs:element>
|
||||
</xs:sequence>
|
||||
|
||||
<xs:attribute name="stampProperty" type="xs:string"></xs:attribute>
|
||||
<xs:anyAttribute namespace='##other' processContents='lax' />
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
|
||||
<!-- element for type: org.apache.activemq.broker.jmx.ManagementContext -->
|
||||
<xs:element name='managementContext'>
|
||||
|
@ -3540,6 +3580,7 @@ socket.
|
|||
<xs:element ref='tns:multicastTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -5114,23 +5155,30 @@ is not enabled in the default ActiveMQ configuration.
|
|||
</xs:annotation>
|
||||
<xs:complexType>
|
||||
<xs:sequence>
|
||||
<xs:element name='adminConnectionContext' minOccurs='0' maxOccurs='1'>
|
||||
<xs:complexType>
|
||||
<xs:sequence minOccurs='0' maxOccurs='1'><xs:any namespace='##other'/></xs:sequence>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element name='next' minOccurs='0' maxOccurs='1'>
|
||||
<xs:complexType>
|
||||
<xs:choice minOccurs='0' maxOccurs='1'>
|
||||
<xs:element ref='tns:loggingBrokerPlugin'/>
|
||||
<xs:element ref='tns:multicastTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:any namespace='##other' minOccurs='0' maxOccurs='unbounded'/>
|
||||
<xs:element name='adminConnectionContext' minOccurs='0'
|
||||
maxOccurs='1'>
|
||||
<xs:complexType>
|
||||
<xs:sequence minOccurs='0' maxOccurs='1'>
|
||||
<xs:any namespace='##other' />
|
||||
</xs:sequence>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element name='next' minOccurs='0' maxOccurs='1'>
|
||||
<xs:complexType>
|
||||
<xs:choice minOccurs='0' maxOccurs='1'>
|
||||
<xs:element ref='tns:loggingBrokerPlugin' />
|
||||
<xs:element ref='tns:multicastTraceBrokerPlugin' />
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin' />
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin' />
|
||||
<xs:element ref='tns:traceBrokerPathPlugin' />
|
||||
<xs:any namespace='##other' />
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:any namespace='##other' minOccurs='0'
|
||||
maxOccurs='unbounded' />
|
||||
<xs:element name="ttlCeiling" type="xs:int"></xs:element>
|
||||
<xs:element name="zeroExpirationOverride" type="xs:int"></xs:element>
|
||||
</xs:sequence>
|
||||
<xs:attribute name='adminConnectionContext' type='xs:string'/>
|
||||
<xs:attribute name='next' type='xs:string'/>
|
||||
|
@ -5320,6 +5368,7 @@ socket.
|
|||
<xs:element ref='tns:multicastTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
|
|
@ -18,18 +18,22 @@ package org.apache.activemq.broker.util;
|
|||
|
||||
import java.net.URI;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.test.JmsTopicSendReceiveTest;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Revision$
|
||||
* @version $Revision: 564271 $
|
||||
*/
|
||||
public class LoggingBrokerTest extends JmsTopicSendReceiveTest {
|
||||
private static final Log LOG = LogFactory.getLog(LoggingBrokerTest.class);
|
||||
public class PluginBrokerTest extends JmsTopicSendReceiveTest {
|
||||
private static final Log LOG = LogFactory.getLog(PluginBrokerTest.class);
|
||||
private BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
|
@ -45,11 +49,40 @@ public class LoggingBrokerTest extends JmsTopicSendReceiveTest {
|
|||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
return createBroker("org/apache/activemq/util/logging-broker.xml");
|
||||
return createBroker("org/apache/activemq/util/plugin-broker.xml");
|
||||
}
|
||||
|
||||
protected BrokerService createBroker(String uri) throws Exception {
|
||||
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
|
||||
return BrokerFactory.createBroker(new URI("xbean:" + uri));
|
||||
}
|
||||
|
||||
protected void assertMessageValid(int index, Message message)
|
||||
throws JMSException {
|
||||
// check if broker path has been set
|
||||
assertEquals("localhost", message.getStringProperty("BrokerPath"));
|
||||
ActiveMQMessage amqMsg = (ActiveMQMessage)message;
|
||||
if (index == 7) {
|
||||
// check custom expiration
|
||||
assertEquals(2000, amqMsg.getExpiration() - amqMsg.getTimestamp());
|
||||
} else if (index == 9) {
|
||||
// check ceiling
|
||||
assertEquals(60000, amqMsg.getExpiration() - amqMsg.getTimestamp());
|
||||
} else {
|
||||
// check default expiration
|
||||
assertEquals(1000, amqMsg.getExpiration() - amqMsg.getTimestamp());
|
||||
}
|
||||
super.assertMessageValid(index, message);
|
||||
}
|
||||
|
||||
protected void sendMessage(int index, Message message) throws Exception {
|
||||
if (index == 7) {
|
||||
producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 2000);
|
||||
} else if (index == 9) {
|
||||
producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 200000);
|
||||
} else {
|
||||
super.sendMessage(index, message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -78,7 +78,6 @@ public abstract class JmsSendReceiveTestSupport extends TestSupport implements M
|
|||
|
||||
LOG.info("Message count for test case is: " + messageCount);
|
||||
data = new String[messageCount];
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
data[i] = createMessageText(i);
|
||||
}
|
||||
|
@ -116,14 +115,17 @@ public abstract class JmsSendReceiveTestSupport extends TestSupport implements M
|
|||
if (verbose) {
|
||||
LOG.info("About to send a message: " + message + " with text: " + data[i]);
|
||||
}
|
||||
|
||||
producer.send(producerDestination, message);
|
||||
sendMessage(i, message);
|
||||
}
|
||||
|
||||
assertMessagesAreReceived();
|
||||
LOG.info("" + data.length + " messages(s) received, closing down connections");
|
||||
}
|
||||
|
||||
protected void sendMessage(int index, Message message) throws Exception {
|
||||
producer.send(producerDestination, message);
|
||||
}
|
||||
|
||||
protected Message createMessage(int index) throws JMSException {
|
||||
Message message = session.createTextMessage(data[index]);
|
||||
return message;
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
<!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
|
||||
<loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
|
||||
|
||||
<timeStampingBrokerPlugin zeroExpirationOverride="1000" ttlCeiling="60000"/>
|
||||
|
||||
<traceBrokerPathPlugin/>
|
||||
|
||||
</plugins>
|
||||
</broker>
|
||||
|
Loading…
Reference in New Issue