diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index fddd4de5e8..1dfbe72971 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -284,8 +284,6 @@ public class ClientProducerImpl implements ClientProducerInternal { theCredits.acquireCredits(creditSize); - session.checkDefaultAddress(sendingAddress); - sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index de45066af3..fd6355a822 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -135,8 +135,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private volatile boolean mayAttemptToFailover = true; - private volatile SimpleString defaultAddress; - /** * Current XID. this will be used in case of failover */ @@ -957,7 +955,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // want // to recreate the session, we just want to unblock the blocking call if (!inClose && mayAttemptToFailover) { - sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); + sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); for (Map.Entry entryx : consumers.entrySet()) { @@ -1036,27 +1034,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void setAddress(final Message message, final SimpleString address) { - if (defaultAddress == null) { - logger.tracef("setAddress() Setting default address as %s", address); + logger.tracef("setAddress() Setting default address as %s", address); - message.setAddress(address); - } else { - if (!address.equals(defaultAddress)) { - logger.tracef("setAddress() setting non default address %s on message", address); - message.setAddress(address); - } else { - logger.trace("setAddress() being set as null"); - message.setAddress(null); - } - } - } - - @Override - public void checkDefaultAddress(SimpleString address) { - if (defaultAddress == null) { - logger.tracef("checkDefaultAddress(%s)", address); - defaultAddress = address; - } + message.setAddress(address); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index ed636bd917..4e06068818 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -93,8 +93,6 @@ public interface ClientSessionInternal extends ClientSession { */ void setAddress(Message message, SimpleString address); - void checkDefaultAddress(SimpleString address); - void setPacketSize(int packetSize); void resetIfNeeded() throws ActiveMQException; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index c72e19bff6..56c7135a32 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -629,9 +629,8 @@ public class ActiveMQSessionContext extends SessionContext { final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks, - final boolean preAcknowledge, - final SimpleString defaultAddress) throws ActiveMQException { - Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); + final boolean preAcknowledge) throws ActiveMQException { + Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); boolean retry; do { try { @@ -662,9 +661,8 @@ public class ActiveMQSessionContext extends SessionContext { boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, - SimpleString defaultAddress) { - return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + boolean preAcknowledge) { + return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 175360cd0d..1f15cc6893 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -250,8 +250,7 @@ public abstract class SessionContext { final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks, - final boolean preAcknowledge, - final SimpleString defaultAddress) throws ActiveMQException; + final boolean preAcknowledge) throws ActiveMQException; public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException; diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index d9322744ff..caa94a10f6 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -63,9 +63,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, - SimpleString defaultAddress) { - return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); + boolean preAcknowledge) { + return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), null); } @Override diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java index f9744d22ab..4ffd2bde75 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -92,10 +93,13 @@ public class PagingLeakTest extends ActiveMQTestBase { positions.clear(); - timeout = System.currentTimeMillis() + 5000; - while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) { - forceGC(); - } + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + forceGC(); + return pagePosInstances.get() == 0; + } + }, 5000, 100); // This is just to validate the rules are correctly applied on byteman assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get()); @@ -110,7 +114,7 @@ public class PagingLeakTest extends ActiveMQTestBase { server.start(); - AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(10 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); server.getAddressSettingsRepository().addMatch("#", settings); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java index c409d5fb3b..d7af4b8c9c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java @@ -104,12 +104,12 @@ public class ProducerTest extends ActiveMQTestBase { ClientProducer producer = session.createProducer(); for (int i = 0; i < 62; i++) { - if (i == 61) { + if (i == 30) { // the point where the send would block latch.countDown(); } ClientMessage msg = session.createMessage(false); - msg.getBodyBuffer().writeBytes(new byte[1024]); + msg.getBodyBuffer().writeBytes(new byte[2048]); producer.send(QUEUE, msg); } } catch (Exception e) { @@ -119,7 +119,7 @@ public class ProducerTest extends ActiveMQTestBase { }; t.start(); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(latch.await(10, TimeUnit.SECONDS)); session.close(); t.join(5000); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java index 20ddae37c6..b51ff8a33b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -93,7 +93,7 @@ public class BackupSyncJournalTest extends FailoverTestBase { @Test public void testReserveFileIdValuesOnBackup() throws Exception { - final int totalRounds = 50; + final int totalRounds = 5; createProducerSendSomeMessages(); JournalImpl messageJournal = getMessageJournalFromServer(liveServer); for (int i = 0; i < totalRounds; i++) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index ee1ac1118f..2dd38aeda0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -730,7 +730,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase { @Test public void testCompactAddAndUpdateFollowedByADelete() throws Exception { - setup(2, 60 * 1024, false); SimpleIDGenerator idGen = new SimpleIDGenerator(1000); @@ -779,7 +778,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase { createJournal(); startJournal(); loadAndCheck(); - } @Test @@ -1610,8 +1608,9 @@ public class NIOJournalCompactTest extends JournalImplTestBase { } + @Test - public void testStressDeletesNoSync() throws Exception { + public void testStressDeletesNoSync() throws Throwable { Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0); final AtomicInteger errors = new AtomicInteger(0); @@ -1629,114 +1628,129 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final JournalStorageManager storage = new JournalStorageManager(config, factory); storage.start(); - storage.loadInternalOnly(); - ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false); - final LinkedList survivingMsgs = new LinkedList<>(); + try { + storage.loadInternalOnly(); - Runnable producerRunnable = new Runnable() { - @Override - public void run() { - try { - while (running.get()) { - final long[] values = new long[100]; - long tx = seqGenerator.incrementAndGet(); + ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false); + final LinkedList survivingMsgs = new LinkedList<>(); - OperationContextImpl ctx = new OperationContextImpl(executor); - storage.setContext(ctx); + Runnable producerRunnable = new Runnable() { + @Override + public void run() { + try { + while (running.get()) { + final long[] values = new long[100]; + long tx = seqGenerator.incrementAndGet(); - for (int i = 0; i < 100; i++) { - long id = seqGenerator.incrementAndGet(); - values[i] = id; + OperationContextImpl ctx = new OperationContextImpl(executor); + storage.setContext(ctx); - ServerMessageImpl message = new ServerMessageImpl(id, 100); + for (int i = 0; i < 100; i++) { + long id = seqGenerator.incrementAndGet(); + values[i] = id; - message.getBodyBuffer().writeBytes(new byte[1024]); + ServerMessageImpl message = new ServerMessageImpl(id, 100); - storage.storeMessageTransactional(tx, message); - } - ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100); + message.getBodyBuffer().writeBytes(new byte[1024]); - survivingMsgs.add(message.getMessageID()); - - // This one will stay here forever - storage.storeMessage(message); - - storage.commit(tx); - - ctx.executeOnCompletion(new IOCallback() { - @Override - public void onError(int errorCode, String errorMessage) { + storage.storeMessageTransactional(tx, message); } + ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100); - @Override - public void done() { - deleteExecutor.execute(new Runnable() { - @Override - public void run() { - try { - for (long messageID : values) { - storage.deleteMessage(messageID); + survivingMsgs.add(message.getMessageID()); + + // This one will stay here forever + storage.storeMessage(message); + + storage.commit(tx); + + ctx.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + deleteExecutor.execute(new Runnable() { + @Override + public void run() { + try { + for (long messageID : values) { + storage.deleteMessage(messageID); + } + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Exception e) { - e.printStackTrace(); - errors.incrementAndGet(); + } + }); + } + }); - } - }); - } - }); - + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); } - } - }; + }; - Runnable compressRunnable = new Runnable() { - @Override - public void run() { - try { - while (running.get()) { - Thread.sleep(500); - System.out.println("Compacting"); - ((JournalImpl) storage.getMessageJournal()).testCompact(); - ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus(); + Runnable compressRunnable = new Runnable() { + @Override + public void run() { + try { + while (running.get()) { + Thread.sleep(500); + System.out.println("Compacting"); + ((JournalImpl) storage.getMessageJournal()).testCompact(); + ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus(); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); + } + }; + Thread producerThread = new Thread(producerRunnable); + producerThread.start(); + + Thread compactorThread = new Thread(compressRunnable); + compactorThread.start(); + + Thread.sleep(1000); + + running.set(false); + + producerThread.join(); + + compactorThread.join(); + + deleteExecutor.shutdown(); + + assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); + + executor.shutdown(); + + assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS)); + + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + try { + storage.stop(); + } catch (Exception e) { + e.printStackTrace(); } - }; - Thread producerThread = new Thread(producerRunnable); - producerThread.start(); + executor.shutdownNow(); + deleteExecutor.shutdownNow(); + } - Thread compactorThread = new Thread(compressRunnable); - compactorThread.start(); - - Thread.sleep(1000); - - running.set(false); - - producerThread.join(); - - compactorThread.join(); - - storage.stop(); - - executor.shutdown(); - - assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS)); - - deleteExecutor.shutdown(); - - assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 1972863b12..2d3df3e616 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -144,18 +144,21 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir); journal.start(); - Loader loadTest = new Loader(numberOfRecords); - journal.load(loadTest); - Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds); - Assert.assertEquals(0, loadTest.numberOfPreparedTransactions); - Assert.assertEquals(0, loadTest.numberOfUpdates); - Assert.assertEquals(0, loadTest.numberOfDeletes); + try { + Loader loadTest = new Loader(numberOfRecords); + journal.load(loadTest); + Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds); + Assert.assertEquals(0, loadTest.numberOfPreparedTransactions); + Assert.assertEquals(0, loadTest.numberOfUpdates); + Assert.assertEquals(0, loadTest.numberOfDeletes); - journal.stop(); - - if (loadTest.ex != null) { - throw loadTest.ex; + if (loadTest.ex != null) { + throw loadTest.ex; + } + } finally { + journal.stop(); } + } // Inner classes ------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 6f0bdc1a00..00c0bdf2bd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -3335,7 +3335,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = null; for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3360,7 +3360,7 @@ public class PagingTest extends ActiveMQTestBase { Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3385,7 +3385,7 @@ public class PagingTest extends ActiveMQTestBase { producer = session.createProducer(PagingTest.ADDRESS); for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3841,7 +3841,7 @@ public class PagingTest extends ActiveMQTestBase { Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024); - server = createServer(true, config, 512 * 1024, 1024 * 1024); + server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2); server.start(); @@ -4745,7 +4745,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); - int biggerMessageSize = 1024; + int biggerMessageSize = 2048; byte[] body = new byte[biggerMessageSize]; ByteBuffer bb = ByteBuffer.wrap(body); for (int j = 1; j <= biggerMessageSize; j++) { @@ -4817,7 +4817,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); - int biggerMessageSize = 1024; + int biggerMessageSize = 2048; byte[] body = new byte[biggerMessageSize]; ByteBuffer bb = ByteBuffer.wrap(body); for (int j = 1; j <= biggerMessageSize; j++) {