mirror of https://github.com/apache/activemq.git
added a test case and fix for AMQ-775 along with a useful base class for testing ActiveMQ using Camel
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@612828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5361df13dc
commit
a28ccae9fb
|
@ -162,6 +162,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
|||
TaskRunnerFactory taskRunnerFactory) {
|
||||
this.connector = connector;
|
||||
this.broker = broker;
|
||||
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
|
||||
RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
|
||||
brokerConnectionStates = rb.getConnectionStates();
|
||||
if (connector != null) {
|
||||
|
@ -638,16 +639,17 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
|||
// Setup the context.
|
||||
String clientId = info.getClientId();
|
||||
context = new ConnectionContext();
|
||||
context.setConnection(this);
|
||||
context.setBroker(broker);
|
||||
context.setConnector(connector);
|
||||
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
|
||||
context.setClientId(clientId);
|
||||
context.setUserName(info.getUserName());
|
||||
context.setConnectionId(info.getConnectionId());
|
||||
context.setClientMaster(info.isClientMaster());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
context.setConnection(this);
|
||||
context.setConnectionId(info.getConnectionId());
|
||||
context.setConnector(connector);
|
||||
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
|
||||
context.setNetworkConnection(networkConnection);
|
||||
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
|
||||
context.setUserName(info.getUserName());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
this.manageable = info.isManageable();
|
||||
state.setContext(context);
|
||||
state.setConnection(this);
|
||||
|
|
|
@ -96,14 +96,18 @@ public class TransportConnector implements Connector {
|
|||
*/
|
||||
public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException {
|
||||
ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getBroker(), getServer());
|
||||
//rc.setBroker(getBroker());
|
||||
rc.setBrokerInfo(getBrokerInfo());
|
||||
rc.setConnectUri(getConnectUri());
|
||||
rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
|
||||
rc.setDiscoveryAgent(getDiscoveryAgent());
|
||||
rc.setDiscoveryUri(getDiscoveryUri());
|
||||
rc.setEnableStatusMonitor(isEnableStatusMonitor());
|
||||
rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
|
||||
rc.setName(getName());
|
||||
//rc.setServer(getServer());
|
||||
rc.setTaskRunnerFactory(getTaskRunnerFactory());
|
||||
rc.setUri(uri);
|
||||
rc.setConnectUri(connectUri);
|
||||
rc.setDiscoveryAgent(discoveryAgent);
|
||||
rc.setDiscoveryUri(discoveryUri);
|
||||
rc.setName(name);
|
||||
rc.setDisableAsyncDispatch(disableAsyncDispatch);
|
||||
rc.setBrokerInfo(brokerInfo);
|
||||
rc.setUri(getUri());
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.message.security;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.camel.CamelEmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.security.MessageAuthorizationPolicy;
|
||||
import org.apache.camel.component.mock.MockEndpoint;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class MessageAuthenticationTest extends CamelEmbeddedBrokerTestSupport {
|
||||
public void testSendInvalidMessage() throws Exception {
|
||||
MockEndpoint results = getMockEndpoint("mock:results");
|
||||
results.expectedBodiesReceived("validBody");
|
||||
|
||||
template.sendBodyAndHeader("activemq:MyQueue", "invalidBody", "myHeader", "xyz");
|
||||
template.sendBodyAndHeader("activemq:MyQueue", "validBody", "myHeader", "abc");
|
||||
|
||||
assertMockEndpointsSatisifed();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(false);
|
||||
answer.setMessageAuthorizationPolicy(new MessageAuthorizationPolicy() {
|
||||
public boolean isAllowedToConsume(ConnectionContext context, Message message) {
|
||||
try {
|
||||
Object value = message.getProperty("myHeader");
|
||||
return "abc".equals(value);
|
||||
}
|
||||
catch (IOException e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
});
|
||||
answer.addConnector(bindAddress);
|
||||
return answer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addCamelRoutes(CamelContext camelContext) throws Exception {
|
||||
camelContext.addRoutes(new RouteBuilder() {
|
||||
public void configure() throws Exception {
|
||||
from("activemq:MyQueue").to("mock:results");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.camel;
|
||||
|
||||
import java.util.Hashtable;
|
||||
|
||||
import javax.naming.Context;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.CamelTemplate;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Endpoint;
|
||||
import org.apache.camel.component.mock.MockEndpoint;
|
||||
import org.apache.camel.impl.DefaultCamelContext;
|
||||
import org.apache.camel.util.jndi.JndiContext;
|
||||
|
||||
/**
|
||||
* A helper class for test cases which use an embedded broker and use Camel to do the routing
|
||||
*
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestSupport {
|
||||
protected CamelContext camelContext;
|
||||
protected CamelTemplate<Exchange> template;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
bindAddress = "tcp://localhost:61616";
|
||||
super.setUp();
|
||||
camelContext = createCamelContext();
|
||||
addCamelRoutes(camelContext);
|
||||
assertValidContext(camelContext);
|
||||
camelContext.start();
|
||||
template = new CamelTemplate<Exchange>(camelContext);
|
||||
template.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
||||
if (template != null) {
|
||||
template.stop();
|
||||
}
|
||||
if (camelContext != null) {
|
||||
camelContext.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected CamelContext createCamelContext() throws Exception {
|
||||
return new DefaultCamelContext(createJndiContext());
|
||||
}
|
||||
|
||||
protected Context createJndiContext() throws Exception {
|
||||
return new JndiContext(new Hashtable());
|
||||
}
|
||||
|
||||
protected void addCamelRoutes(CamelContext camelContext) throws Exception {
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Resolves a mandatory endpoint for the given URI or an exception is thrown
|
||||
*
|
||||
* @param uri the Camel <a href="">URI</a> to use to create or resolve an endpoint
|
||||
* @return the endpoint
|
||||
*/
|
||||
protected Endpoint resolveMandatoryEndpoint(String uri) {
|
||||
return resolveMandatoryEndpoint(camelContext, uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves a mandatory endpoint for the given URI and expected type or an exception is thrown
|
||||
*
|
||||
* @param uri the Camel <a href="">URI</a> to use to create or resolve an endpoint
|
||||
* @return the endpoint
|
||||
*/
|
||||
protected <T extends Endpoint> T resolveMandatoryEndpoint(String uri, Class<T> endpointType) {
|
||||
return resolveMandatoryEndpoint(camelContext, uri, endpointType);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Resolves an endpoint and asserts that it is found
|
||||
*/
|
||||
protected Endpoint resolveMandatoryEndpoint(CamelContext context, String uri) {
|
||||
Endpoint endpoint = context.getEndpoint(uri);
|
||||
|
||||
assertNotNull("No endpoint found for URI: " + uri, endpoint);
|
||||
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves an endpoint and asserts that it is found
|
||||
*/
|
||||
protected <T extends Endpoint> T resolveMandatoryEndpoint(CamelContext context, String uri,
|
||||
Class<T> endpointType) {
|
||||
T endpoint = context.getEndpoint(uri, endpointType);
|
||||
|
||||
assertNotNull("No endpoint found for URI: " + uri, endpoint);
|
||||
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the mandatory Mock endpoint using a URI of the form <code>mock:someName</code>
|
||||
*
|
||||
* @param uri the URI which typically starts with "mock:" and has some name
|
||||
* @return the mandatory mock endpoint or an exception is thrown if it could not be resolved
|
||||
*/
|
||||
protected MockEndpoint getMockEndpoint(String uri) {
|
||||
return resolveMandatoryEndpoint(uri, MockEndpoint.class);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Asserts that all the expectations of the Mock endpoints are valid
|
||||
*/
|
||||
protected void assertMockEndpointsSatisifed() throws InterruptedException {
|
||||
MockEndpoint.assertIsSatisfied(camelContext);
|
||||
}
|
||||
|
||||
protected void assertValidContext(CamelContext context) {
|
||||
assertNotNull("No context found!", context);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue