Robert Davies 2011-12-02 21:34:14 +00:00
parent f260b17488
commit 3a71f8e33d
4 changed files with 210 additions and 75 deletions

View File

@ -1307,6 +1307,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
}
//dispose of transport
Transport t = this.transport;
if (null != t){
ServiceSupport.dispose(t);
}
if(jmsEx !=null) {
throw jmsEx;
}
@ -1515,6 +1520,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
started.set(false);
}
public void finalize() throws Throwable{
if (scheduler != null){
scheduler.stop();
}
}
/**
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
@ -2229,10 +2240,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onControlCommand(ControlCommand command) {
String text = command.getCommand();
if (text != null) {
if (text.equals("shutdown")) {
if ("shutdown".equals(text)) {
LOG.info("JVM told to shutdown");
System.exit(0);
}
if (false && "close".equals(text)){
LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
try {
close();
} catch (JMSException e) {
}
}
}
}

View File

@ -35,47 +35,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.*;
import org.apache.activemq.command.*;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.MBeanNetworkListener;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
@ -93,7 +61,10 @@ import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.*;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -150,15 +121,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private String duplexNetworkConnectorId;
/**
* @param connector
* @param transport
* @param broker
* @param taskRunnerFactory
* - can be null if you want direct dispatch to the transport
* else commands are sent async.
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
* else commands are sent async.
*/
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
TaskRunnerFactory taskRunnerFactory) {
TaskRunnerFactory taskRunnerFactory) {
this.connector = connector;
this.broker = broker;
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
@ -238,7 +205,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
private boolean expected(IOException e) {
return isStomp() &&
return isStomp() &&
((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
}
@ -251,8 +218,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* Calls the serviceException method in an async thread. Since handling a
* service exception closes a socket, we should not tie up broker threads
* since client sockets may hang or cause deadlocks.
*
* @param e
*/
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
@ -321,8 +286,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
+ " command: " + command + ", exception: " + e, e);
}
if (responseRequired) {
response = new ExceptionResponse(e);
//still need to close this down - incase the peer of this transport doesn't play nice
delayedStop(2000);
} else {
serviceException(e);
}
@ -619,7 +588,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// this down.
session.shutdown();
// Cascade the connection stop to the consumers and producers.
for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) {
ConsumerId consumerId = (ConsumerId) iter.next();
try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
@ -627,7 +596,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
}
}
for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) {
ProducerId producerId = (ProducerId) iter.next();
try {
processRemoveProducer(producerId);
@ -679,7 +648,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
this.faultTolerantConnection=info.isFaultTolerant();
this.faultTolerantConnection = info.isFaultTolerant();
// Setup the context.
String clientId = info.getClientId();
context = new ConnectionContext();
@ -710,7 +679,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail:", e);
}
@ -735,7 +704,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
SessionId sessionId = (SessionId) iter.next();
try {
processRemoveSession(sessionId, lastDeliveredSequenceId);
@ -744,7 +713,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
// Cascade the connection stop to temp destinations.
for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
DestinationInfo di = (DestinationInfo) iter.next();
try {
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
@ -898,7 +867,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void start() throws Exception {
try {
synchronized (this) {
starting = true;
starting = true;
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
@ -940,6 +909,25 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
public void delayedStop(final int waitTime) {
if (waitTime > 0) {
try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
Thread.sleep(waitTime);
stopAsync();
} catch (InterruptedException e) {
}
}
}, "delayedStop:" + transport.getRemoteAddress());
} catch (Throwable t) {
LOG.warn("cannot create stopAsync :", t);
}
}
}
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
@ -957,7 +945,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true);
}
try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() {
serviceLock.writeLock().lock();
try {
@ -1010,7 +998,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
synchronized (dispatchQueue) {
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
@ -1050,8 +1038,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
/**
* @param blockedCandidate
* The blockedCandidate to set.
* @param blockedCandidate The blockedCandidate to set.
*/
public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate = blockedCandidate;
@ -1065,8 +1052,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
/**
* @param markedCandidate
* The markedCandidate to set.
* @param markedCandidate The markedCandidate to set.
*/
public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate;
@ -1077,8 +1063,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
/**
* @param slow
* The slow to set.
* @param slow The slow to set.
*/
public void setSlow(boolean slow) {
this.slow = slow;
@ -1122,16 +1107,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
/**
* @param blocked
* The blocked to set.
* @param blocked The blocked to set.
*/
public void setBlocked(boolean blocked) {
this.blocked = blocked;
}
/**
* @param connected
* The connected to set.
* @param connected The connected to set.
*/
public void setConnected(boolean connected) {
this.connected = connected;
@ -1145,8 +1128,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
/**
* @param active
* The active to set.
* @param active The active to set.
*/
public void setActive(boolean active) {
this.active = active;
@ -1164,7 +1146,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public boolean isFaultTolerantConnection() {
return this.faultTolerantConnection;
return this.faultTolerantConnection;
}
protected synchronized void setStarting(boolean starting) {
@ -1196,7 +1178,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
}
LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
@ -1217,7 +1199,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
synchronized (connections) {
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
TransportConnection c = iter.next();
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
@ -1253,7 +1235,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
return null;
} catch (Exception e) {
LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
return null;
}
}
@ -1406,7 +1388,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
TransportConnectionState state) {
TransportConnectionState state) {
TransportConnectionState cs = null;
if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
// swap implementations

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.security;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The configuration is set to except a maximum of 2 concurrent connections
* As the exception is delibrately ignored, the ActiveMQConnection would continue to
* attempt to connect unless the connection's transport was also stopped on an error.
* <p/>
* As the maximum connections allowed is 2, no more connections would be allowed unless
* the transport was adequately destroyed on the broker side.
*/
public class DoSTest extends JmsTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(DoSTest.class);
public void testInvalidAuthentication() throws Throwable {
for (int i = 0; i < 1000; i++) {
try {
// Bad password
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection c = factory.createConnection("bad", "krap");
c.start();
fail("Expected exception.");
} catch (JMSException e) {
}
}
}
protected BrokerService createBroker() throws Exception {
return createBroker("org/apache/activemq/security/dos-broker.xml");
}
protected BrokerService createBroker(String uri) throws Exception {
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri));
}
}

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">
<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq-domain" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
<!-- let's assign roles to temporary destinations. comment this entry if we don't want any roles assigned to temp destinations -->
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616?maximumConnections=2"/>
</transportConnectors>
</broker>
</beans>
<!-- END SNIPPET: example -->