Merge pull request #3 from apache/master

Synching with AMQ master
This commit is contained in:
Jamie Goodyear 2018-07-27 21:51:43 -02:30 committed by GitHub
commit 31fbc20cd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 632 additions and 70 deletions

View File

@ -16,10 +16,6 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.Message;
@ -28,6 +24,10 @@ import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Holds internal state in the broker for a MessageProducer
*/
@ -213,7 +213,7 @@ public class ProducerBrokerExchange {
}
public int getPercentageBlocked() {
double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
double value = flowControlInfo.getTotalSends() == 0 ? 0 : flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
return (int) value * 100;
}

View File

@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region {
addSubscriptionsForDestination(context, dest);
destinations.put(destination, dest);
updateRegionDestCounts(destination, 1);
destinationMap.put(destination, dest);
destinationMap.unsynchronizedPut(destination, dest);
}
if (dest == null) {
throw new DestinationDoesNotExistException(destination.getQualifiedName());
@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region {
// If a destination isn't specified, then just count up
// non-advisory destinations (ie count all destinations)
int destinationSize = (int) (entry.getDestination() != null ?
destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
if (destinationSize >= entry.getMaxDestinations()) {
if (entry.getDestination() != null) {
throw new IllegalStateException(
@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub, 0l);
}
}
destinationMap.remove(destination, dest);
destinationMap.unsynchronizedRemove(destination, dest);
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region {
public Set<Destination> getDestinations(ActiveMQDestination destination) {
destinationsLock.readLock().lock();
try{
return destinationMap.get(destination);
return destinationMap.unsynchronizedGet(destination);
} finally {
destinationsLock.readLock().unlock();
}
@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> addList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
addList.add(dest);
}
// ensure sub visible to any new dest addSubscriptionsForDestination
@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> removeList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
removeList.add(dest);
}
} finally {
@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region {
// Try to auto create the destination... re-invoke broker
// from the
// top so that the proper security checks are performed.
context.getBroker().addDestination(context, destination, createTemporary);
dest = addDestination(context, destination, false);
// We should now have the dest created.
destinationsLock.readLock().lock();
try {
dest = destinations.get(destination);
} finally {
destinationsLock.readLock().unlock();
}
dest = context.getBroker().addDestination(context, destination, createTemporary);
}
if (dest == null) {
@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.addProducer(context, info);
}
} finally {
@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.removeProducer(context, info);
}
} finally {

View File

@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter {
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
BaseDestination regionDest = null;
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
}
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
}

View File

@ -1923,7 +1923,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (info == null) {
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
while (!disposed.get() || System.currentTimeMillis() < deadline) {
while (!disposed.get() || System.currentTimeMillis() - deadline < 0) {
if (slot.await(1, TimeUnit.MILLISECONDS)) {
break;
}

View File

@ -191,7 +191,7 @@ public class VMTransportFactory extends TransportFactory {
broker = registry.lookup(brokerName);
if (broker == null || waitForStart > 0) {
final long expiry = System.currentTimeMillis() + waitForStart;
while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) {
while ((broker == null || !broker.isStarted()) && System.currentTimeMillis() - expiry < 0) {
long timeout = Math.max(0, expiry - System.currentTimeMillis());
if (broker == null) {
try {

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ProducerBrokerExchangeTest {
@Test
public void testGetPercentageBlockedHandlesDivideByZero(){
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
producerBrokerExchange.getPercentageBlocked();
}
@Test
public void testGetPercentageBlockedNonZero(){
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
producerBrokerExchange.blockingOnFlowControl(true);
producerBrokerExchange.incrementSend();
assertEquals(100.0, producerBrokerExchange.getPercentageBlocked(), 0);
}
}

View File

@ -1157,7 +1157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
break;
}
}
} while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
} while (numberNotReplayed > 0 && expiry - System.currentTimeMillis() < 0);
}
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.activemq.command.ActiveMQDestination;
@ -60,13 +59,20 @@ public class DestinationMap {
* matching values.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public synchronized Set get(ActiveMQDestination key) {
public Set get(ActiveMQDestination key) {
synchronized (this) {
return unsynchronizedGet(key);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
public Set unsynchronizedGet(ActiveMQDestination key) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
Set answer = new HashSet(destinations.length);
for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination childDestination = destinations[i];
Object value = get(childDestination);
Object value = unsynchronizedGet(childDestination);
if (value instanceof Set) {
answer.addAll((Set) value);
} else if (value != null) {
@ -78,7 +84,13 @@ public class DestinationMap {
return findWildcardMatches(key);
}
public synchronized void put(ActiveMQDestination key, Object value) {
public void put(ActiveMQDestination key, Object value) {
synchronized (this) {
unsynchronizedPut(key, value);
}
}
public void unsynchronizedPut(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
@ -95,7 +107,13 @@ public class DestinationMap {
/**
* Removes the value from the associated destination
*/
public synchronized void remove(ActiveMQDestination key, Object value) {
public void remove(ActiveMQDestination key, Object value) {
synchronized (this) {
unsynchronizedRemove(key, value);
}
}
public void unsynchronizedRemove(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {

View File

@ -233,7 +233,7 @@ public class ConnectionPool implements ExceptionListener {
}
}
if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
if (expiryTimeout > 0 && (firstUsed + expiryTimeout) - System.currentTimeMillis() < 0) {
hasExpired = true;
if (referenceCount == 0) {
close();
@ -243,7 +243,7 @@ public class ConnectionPool implements ExceptionListener {
// Only set hasExpired here is no references, as a Connection with references is by
// definition not idle at this time.
if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
if (referenceCount == 0 && idleTimeout > 0 && (lastUsed + idleTimeout) - System.currentTimeMillis() < 0) {
hasExpired = true;
close();
expired = true;

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertFalse;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.Before;
import org.junit.Test;
public class ConnectionPoolTest extends JmsPoolTestSupport {
private class PooledConnectionFactoryTest extends PooledConnectionFactory {
ConnectionPool pool = null;
@Override
protected Connection newPooledConnection(ConnectionPool connection) {
connection.setIdleTimeout(Integer.MAX_VALUE);
this.pool = connection;
Connection ret = super.newPooledConnection(connection);
ConnectionPool cp = ((PooledConnection) ret).pool;
cp.decrementReferenceCount();
// will fail if timeout does overflow
assertFalse(cp.expiredCheck());
return ret;
}
public ConnectionPool getPool() {
return pool;
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(false);
brokerService.start();
brokerService.waitUntilStarted();
}
@Test(timeout = 120000)
public void demo() throws JMSException, InterruptedException {
final PooledConnectionFactoryTest pooled = new PooledConnectionFactoryTest();
pooled.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?create=false"));
pooled.setMaxConnections(2);
pooled.setExpiryTimeout(Long.MAX_VALUE);
pooled.start();
}
}

View File

@ -547,12 +547,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
public boolean isPurgeRecoveredXATransactions() {
return letter.isPurgeRecoveredXATransactions();
public String getPurgeRecoveredXATransactionStrategy() {
return letter.getPurgeRecoveredXATransactionStrategy();
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
letter.setPurgeRecoveredXATransactionStrategy(purgeRecoveredXATransactionStrategy);
}
@Override

View File

@ -240,6 +240,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
public enum PurgeRecoveredXATransactionStrategy {
NEVER,
COMMIT,
ROLLBACK;
}
protected PageFile pageFile;
protected Journal journal;
protected Metadata metadata = new Metadata();
@ -272,7 +278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean purgeRecoveredXATransactions = false;
protected PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private boolean archiveCorruptedIndex = false;
@ -746,14 +752,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
synchronized (preparedTransactions) {
for (TransactionId txId : preparedTransactions.keySet()) {
Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
for (TransactionId txId : txIds) {
switch (purgeRecoveredXATransactionStrategy){
case NEVER:
LOG.warn("Recovered prepared XA TX: [{}]", txId);
}
if (purgeRecoveredXATransactions){
if (!preparedTransactions.isEmpty()){
LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" );
preparedTransactions.clear();
break;
case COMMIT:
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
break;
case ROLLBACK:
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
break;
}
}
}
@ -3315,12 +3327,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
public boolean isPurgeRecoveredXATransactions() {
return purgeRecoveredXATransactions;
public PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum() {
return purgeRecoveredXATransactionStrategy;
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
public String getPurgeRecoveredXATransactionStrategy() {
return purgeRecoveredXATransactionStrategy.name();
}
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
this.purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.valueOf(
purgeRecoveredXATransactionStrategy.trim().toUpperCase());
}
public boolean isChecksumJournalFiles() {

View File

@ -187,6 +187,10 @@
<version>${hadoop-version}</version>
<exclusions>
<!-- hadoop's transative dependencies are such a pig -->
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>

View File

@ -91,6 +91,7 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
if (manager == null) {
throw new JMSException("No JCA ConnectionManager configured! Either enable UseInboundSessionEnabled or get your JCA container to configure one.");
}
return (Connection)manager.allocateConnection(factory, connectionRequestInfo);
} catch (ResourceException e) {
// Throw the root cause if it was a JMSException..

View File

@ -193,6 +193,10 @@ public class ActiveMQConnectionSupport {
info.setServerUrl(url);
}
public String getTrustStore() {
return info.getTrustStore();
}
public void setTrustStore(String trustStore) {
if (log.isDebugEnabled()) {
log.debug(this + ", setting [trustStore] to: " + trustStore);
@ -200,6 +204,10 @@ public class ActiveMQConnectionSupport {
info.setTrustStore(trustStore);
}
public String getTrustStorePassword() {
return info.getTrustStorePassword();
}
public void setTrustStorePassword(String trustStorePassword) {
if (log.isDebugEnabled()) {
log.debug(this + ", setting [trustStorePassword] to: " + trustStorePassword);
@ -207,6 +215,10 @@ public class ActiveMQConnectionSupport {
info.setTrustStorePassword(trustStorePassword);
}
public String getKeyStore() {
return info.getKeyStore();
}
public void setKeyStore(String keyStore) {
if (log.isDebugEnabled()) {
log.debug(this + ", setting [keyStore] to: " + keyStore);
@ -214,6 +226,10 @@ public class ActiveMQConnectionSupport {
info.setKeyStore(keyStore);
}
public String getKeyStorePassword() {
return info.getKeyStorePassword();
}
public void setKeyStorePassword(String keyStorePassword) {
if (log.isDebugEnabled()) {
log.debug(this + ", setting [keyStorePassword] to: " + keyStorePassword);
@ -221,6 +237,10 @@ public class ActiveMQConnectionSupport {
info.setKeyStorePassword(keyStorePassword);
}
public String getKeyStoreKeyPassword() {
return info.getKeyStoreKeyPassword();
}
public void setKeyStoreKeyPassword(String keyStoreKeyPassword) {
if (log.isDebugEnabled()) {
log.debug(this + ", setting [keyStoreKeyPassword] to: " + keyStoreKeyPassword);

View File

@ -76,21 +76,33 @@ public class ActiveMQManagedConnectionFactory extends ActiveMQConnectionSupport
if (getUserName() == null) {
setUserName(baseInfo.getUserName());
}
if (getDurableTopicPrefetch() != null) {
if (getDurableTopicPrefetch() == null) {
setDurableTopicPrefetch(baseInfo.getDurableTopicPrefetch());
}
if (getOptimizeDurableTopicPrefetch() != null) {
if (getOptimizeDurableTopicPrefetch() == null) {
setOptimizeDurableTopicPrefetch(baseInfo.getOptimizeDurableTopicPrefetch());
}
if (getQueuePrefetch() != null) {
if (getQueuePrefetch() == null) {
setQueuePrefetch(baseInfo.getQueuePrefetch());
}
if (getQueueBrowserPrefetch() != null) {
if (getQueueBrowserPrefetch() == null) {
setQueueBrowserPrefetch(baseInfo.getQueueBrowserPrefetch());
}
if (getTopicPrefetch() != null) {
if (getTopicPrefetch() == null) {
setTopicPrefetch(baseInfo.getTopicPrefetch());
}
if (getKeyStore() == null) {
setKeyStore(baseInfo.getKeyStore());
}
if (getKeyStorePassword() == null) {
setKeyStorePassword(baseInfo.getKeyStorePassword());
}
if (getTrustStore() == null) {
setTrustStore(baseInfo.getTrustStore());
}
if (getTrustStorePassword() == null) {
setTrustStorePassword(baseInfo.getTrustStorePassword());
}
}
}

View File

@ -84,7 +84,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
long endTime = System.currentTimeMillis() + duration;
while (System.currentTimeMillis() < endTime) {
while (System.currentTimeMillis() - endTime < 0) {
getJmsConsumer().receive();
incThroughput();
sleep();

View File

@ -199,7 +199,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
// Send to more than one actual destination
if (dest.length > 1) {
while (System.currentTimeMillis() < endTime) {
while (System.currentTimeMillis() - endTime < 0) {
for (int j = 0; j < dest.length; j++) {
getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput();
@ -209,7 +209,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
}
// Send to only one actual destination
} else {
while (System.currentTimeMillis() < endTime) {
while (System.currentTimeMillis() - endTime < 0) {
getJmsProducer().send(getJmsTextMessage());
incThroughput();
sleep();
@ -224,7 +224,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
// Send to more than one actual destination
long count = 1;
if (dest.length > 1) {
while (System.currentTimeMillis() < endTime) {
while (System.currentTimeMillis() - endTime < 0) {
for (int j = 0; j < dest.length; j++) {
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput();
@ -235,7 +235,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
// Send to only one actual destination
} else {
while (System.currentTimeMillis() < endTime) {
while (System.currentTimeMillis() - endTime < 0) {
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput();

View File

@ -267,7 +267,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ();
}
public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception {
public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@ -306,7 +306,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactions(true);
adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
@ -320,9 +320,77 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since rolledback but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since committed ... they should get delivered.
for (int i = 0; i < 4; i++) {
assertNotNull(receiveMessage(connection));
}
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class VirtualTopicDestinationMapAccessTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class);
BrokerService brokerService;
ConnectionFactory connectionFactory;
@Before
public void createBroker() throws Exception {
createBroker(true);
}
public void createBroker(boolean delete) throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(delete);
brokerService.setAdvisorySupport(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
zeroPrefetch.setAll(0);
activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
connectionFactory = activeMQConnectionFactory;
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test
@Ignore("perf test that needs manual comparator")
public void testX() throws Exception {
final int numConnections = 200;
final int numDestinations = 10000;
final AtomicInteger numConsumers = new AtomicInteger(numDestinations);
final AtomicInteger numProducers = new AtomicInteger(numDestinations);
ExecutorService executorService = Executors.newFixedThreadPool(numConnections);
// precreate dests to accentuate read access
for (int i=0; i<numDestinations; i++ ) {
brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(),
new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i),
false);
brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST-" + i), false);
}
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
int opsCount = 0;
Connection connection1 = connectionFactory.createConnection();
connection1.start();
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
do {
boolean consumerOrProducer = opsCount++ % 2 == 0;
int i = consumerOrProducer ? numConsumers.decrementAndGet() : numProducers.decrementAndGet();
if (i > 0) {
if (consumerOrProducer) {
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i));
} else {
producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage());
}
}
} while (numConsumers.get() > 0 || numProducers.get() > 0);
connection1.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < numConnections; i++) {
executorService.execute(runnable);
}
long start = System.currentTimeMillis();
LOG.info("Starting timer: " + start);
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.MINUTES);
LOG.info("Done, duration: " + (System.currentTimeMillis() - start));
}
}

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.*;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
public class DemandForwardingBridgeSupportTest extends NetworkTestSupport {
private DemandForwardingBridge bridge;
private StubConnection producerConnection;
private ProducerInfo producerInfo;
private StubConnection consumerConnection;
private SessionInfo consumerSessionInfo;
public void testOverflow() throws Exception {
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>",
ActiveMQDestination.TOPIC_TYPE)));
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
"TEST", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
}
private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
InterruptedException {
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
// Send the message to the local broker.
producerConnection.send(createMessage(producerInfo, destination, destinationType));
// Make sure the message was delivered via the remote.
Message m = createConsumerAndReceiveMessage(destination);
assertNotNull(m);
}
private void assertReceiveNoMessageOn(String destinationName, byte destinationType) throws Exception,
InterruptedException {
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
// Send the message to the local broker.
producerConnection.send(createMessage(producerInfo, destination, destinationType));
// Make sure the message was delivered via the remote.
Message m = createConsumerAndReceiveMessage(destination);
assertNull(m);
}
private Message createConsumerAndReceiveMessage(ActiveMQDestination destination) throws Exception {
// Now create remote consumer that should cause message to move to this
// remote consumer.
ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, destination);
consumerConnection.send(consumerInfo);
Message m = receiveMessage(consumerConnection);
return m;
}
private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws Exception {
bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport());
bridge.setBrokerService(broker);
bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]
));
bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]
));
bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]
));
bridge.start();
}
public NetworkBridgeConfiguration getDefaultBridgeConfiguration() {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setBrokerName("local");
config.setDispatchAsync(false);
return config;
}
// create sockets with max waiting value accepted
@Override
protected String getLocalURI() {
int port = findFreePort();
return String.format("tcp://localhost:%d?connectionTimeout=2147483647", port);
}
@Override
protected String getRemoteURI() {
int port = findFreePort();
return String.format("tcp://localhost:%d?connectionTimeout=2147483647",port);
}
private static int findFreePort() {
ServerSocket socket = null;
try {
socket = new ServerSocket(0);
socket.setReuseAddress(true);
int port = socket.getLocalPort();
try {
socket.close();
} catch (IOException e) {
// Ignore IOException on close()
}
return port;
} catch (IOException e) {
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
}
}
}
throw new IllegalStateException("Could not find a free TCP/IP port to start embedded Jetty HTTP Server on");
}
@Override
protected void setUp() throws Exception {
super.setUp();
producerConnection = createConnection();
ConnectionInfo producerConnectionInfo = createConnectionInfo();
SessionInfo producerSessionInfo = createSessionInfo(producerConnectionInfo);
producerInfo = createProducerInfo(producerSessionInfo);
producerConnection.send(producerConnectionInfo);
producerConnection.send(producerSessionInfo);
producerConnection.send(producerInfo);
consumerConnection = createRemoteConnection();
ConnectionInfo consumerConnectionInfo = createConnectionInfo();
consumerSessionInfo = createSessionInfo(consumerConnectionInfo);
consumerConnection.send(consumerConnectionInfo);
consumerConnection.send(consumerSessionInfo);
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public static Test suite() {
return suite(DemandForwardingBridgeSupportTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

16
pom.xml
View File

@ -72,17 +72,17 @@
<httpclient-version>4.5.3</httpclient-version>
<httpcore-version>4.4.6</httpcore-version>
<insight-version>1.2.0.Beta4</insight-version>
<jackson-version>2.9.4</jackson-version>
<jackson-version>2.9.6</jackson-version>
<jasypt-version>1.9.2</jasypt-version>
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
<jdom-version>1.0</jdom-version>
<jetty9-version>9.2.22.v20170606</jetty9-version>
<jetty-version>${jetty9-version}</jetty-version>
<jmdns-version>3.4.1</jmdns-version>
<tomcat-api-version>8.0.24</tomcat-api-version>
<tomcat-api-version>8.0.53</tomcat-api-version>
<jettison-version>1.3.8</jettison-version>
<jmock-version>2.5.1</jmock-version>
<jolokia-version>1.5.0</jolokia-version>
<jolokia-version>1.6.0</jolokia-version>
<josql-version>1.5_5</josql-version>
<!-- for json-simple use same version as jolokia uses -->
<json-simple-version>1.1.1</json-simple-version>
@ -103,11 +103,11 @@
<p2psockets-version>1.1.2</p2psockets-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.27.1</qpid-proton-version>
<qpid-jms-version>0.33.0</qpid-jms-version>
<qpid-jms-netty-version>4.1.24.Final</qpid-jms-netty-version>
<qpid-jms-proton-version>0.27.1</qpid-jms-proton-version>
<netty-all-version>4.1.24.Final</netty-all-version>
<qpid-proton-version>0.28.0</qpid-proton-version>
<qpid-jms-version>0.35.0</qpid-jms-version>
<qpid-jms-netty-version>4.1.27.Final</qpid-jms-netty-version>
<qpid-jms-proton-version>0.28.0</qpid-jms-proton-version>
<netty-all-version>4.1.27.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-5</saxon-version>