mirror of https://github.com/apache/activemq.git
fix AMQ-1967, server side transport options are now respected, it is possible to configure a broker connector url like tcp://localhost:61617?transport.reuseAddress=true etc
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@701477 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c89adb8411
commit
3332220a35
|
@ -21,6 +21,7 @@ import java.net.InetAddress;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.transport.TransportLoggerFactory;
|
|||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.TransportServerThreadSupport;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.ServiceListener;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
|
@ -133,7 +135,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
|
|||
} else {
|
||||
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
|
||||
}
|
||||
this.serverSocket.setSoTimeout(2000);
|
||||
configureServerSocket(this.serverSocket);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
|
||||
}
|
||||
|
@ -153,6 +156,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
|
|||
}
|
||||
}
|
||||
|
||||
private void configureServerSocket(ServerSocket socket) throws SocketException {
|
||||
socket.setSoTimeout(2000);
|
||||
IntrospectionSupport.setProperties(socket, transportOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the wireFormatFactory.
|
||||
*/
|
||||
|
|
|
@ -34,12 +34,12 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class DuplexNetworkMBeanTest extends TestCase {
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
|
||||
protected final int numRestarts = 10;
|
||||
protected final int numRestarts = 5;
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("broker");
|
||||
broker.addConnector("tcp://localhost:61617");
|
||||
broker.addConnector("tcp://localhost:61617?transport.reuseAddress=true");
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
@ -47,8 +47,8 @@ public class DuplexNetworkMBeanTest extends TestCase {
|
|||
protected BrokerService createNetworkedBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("networkedBroker");
|
||||
broker.addConnector("tcp://localhost:62617");
|
||||
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617)?maxReconnectDelay=1000&useExponentialBackOff=false");
|
||||
broker.addConnector("tcp://localhost:62617?transport.reuseAddress=true");
|
||||
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
|
||||
networkConnector.setDuplex(true);
|
||||
return broker;
|
||||
}
|
||||
|
@ -87,11 +87,12 @@ public class DuplexNetworkMBeanTest extends TestCase {
|
|||
for (int i=0; i<numRestarts; i++) {
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 10000));
|
||||
assertEquals(1, countMbeans(broker, "Connection"));
|
||||
assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 5000));
|
||||
assertEquals("restart number: " + i, 1, countMbeans(broker, "Connection", 10000));
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
assertEquals(0, countMbeans(broker, "stopped"));
|
||||
}
|
||||
|
||||
assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.tcp;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
|
||||
public class TcpTransportBindTest extends EmbeddedBrokerTestSupport {
|
||||
final String addr = "tcp://localhost:61617";
|
||||
|
||||
/**
|
||||
* exercise some server side socket options
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
bindAddress = addr + "?transport.reuseAddress=true&transport.soTimeout=1000";
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
public void testConnect() throws Exception {
|
||||
Connection connection = new ActiveMQConnectionFactory(addr).createConnection();
|
||||
connection.start();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue