mirror of https://github.com/apache/activemq.git
apply patch for: https://issues.apache.org/activemq/browse/AMQ-2308 with thanks; apply fix for https://issues.apache.org/activemq/browse/AMQ-1993 to the client side
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@789291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e7d2195cc
commit
753b21047a
|
@ -264,11 +264,6 @@ public abstract class TransportFactory {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
|
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
|
||||||
if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
|
|
||||||
transport = new WriteTimeoutFilter(transport);
|
|
||||||
String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
|
|
||||||
if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
|
|
||||||
}
|
|
||||||
if (options.containsKey(THREAD_NAME_FILTER)) {
|
if (options.containsKey(THREAD_NAME_FILTER)) {
|
||||||
transport = new ThreadNameFilter(transport);
|
transport = new ThreadNameFilter(transport);
|
||||||
}
|
}
|
||||||
|
@ -288,6 +283,13 @@ public abstract class TransportFactory {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||||
|
if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
|
||||||
|
transport = new WriteTimeoutFilter(transport);
|
||||||
|
String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
|
||||||
|
if (soWriteTimeout!=null) {
|
||||||
|
((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
|
||||||
|
}
|
||||||
|
}
|
||||||
IntrospectionSupport.setProperties(transport, options);
|
IntrospectionSupport.setProperties(transport, options);
|
||||||
return transport;
|
return transport;
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ public class TcpTransportFactory extends TransportFactory {
|
||||||
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
|
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
return transport;
|
return super.compositeConfigure(transport, format, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getOption(Map options, String key, String def) {
|
private String getOption(Map options, String key, String def) {
|
||||||
|
|
|
@ -29,7 +29,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
|
||||||
protected BrokerService slave;
|
protected BrokerService slave;
|
||||||
protected int inflightMessageCount;
|
protected int inflightMessageCount;
|
||||||
protected int failureCount = 50;
|
protected int failureCount = 50;
|
||||||
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
|
protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false";
|
||||||
private boolean stopMaster = false;
|
private boolean stopMaster = false;
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
|
|
|
@ -16,45 +16,34 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.store;
|
package org.apache.activemq.broker.store;
|
||||||
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import junit.framework.Test;
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.broker.BrokerTest;
|
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
import org.apache.activemq.command.LocalTransactionId;
|
import org.apache.activemq.command.LocalTransactionId;
|
||||||
import org.apache.activemq.command.TransactionId;
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
|
||||||
import org.apache.activemq.store.amq.AMQTransactionStore;
|
import org.apache.activemq.store.amq.AMQTransactionStore;
|
||||||
import org.apache.activemq.store.amq.AMQTx;
|
import org.apache.activemq.store.amq.AMQTx;
|
||||||
|
|
||||||
/**
|
|
||||||
* Once the wire format is completed we can test against real persistence storage.
|
|
||||||
*
|
|
||||||
* @version $Revision$
|
|
||||||
*/
|
|
||||||
public class TransactionStoreTest extends TestCase {
|
public class TransactionStoreTest extends TestCase {
|
||||||
|
|
||||||
protected static final int MAX_TX = 2500;
|
protected static final int MAX_TX = 2500;
|
||||||
protected static final int MAX_THREADS = 200;
|
protected static final int MAX_THREADS = 200;
|
||||||
|
|
||||||
class UnderTest extends AMQTransactionStore {
|
class BeingTested extends AMQTransactionStore {
|
||||||
public UnderTest() {
|
public BeingTested() {
|
||||||
super(null);
|
super(null);
|
||||||
}
|
}
|
||||||
public Map<TransactionId, AMQTx> getInFlight() {
|
public Map<TransactionId, AMQTx> getInFlight() {
|
||||||
return inflightTransactions;
|
return inflightTransactions;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
UnderTest underTest = new UnderTest();
|
BeingTested underTest = new BeingTested();
|
||||||
|
|
||||||
public void testConcurrentGetTx() throws Exception {
|
public void testConcurrentGetTx() throws Exception {
|
||||||
final ConnectionId connectionId = new ConnectionId("1:1");
|
final ConnectionId connectionId = new ConnectionId("1:1");
|
||||||
|
|
Loading…
Reference in New Issue