ARTEMIS-829 Removing messages re-encoding

https://issues.apache.org/jira/browse/ARTEMIS-829
This commit is contained in:
Clebert Suconic 2016-10-24 18:20:20 -04:00
parent 4b5cbb86aa
commit e0021252ee
12 changed files with 145 additions and 152 deletions

View File

@ -284,8 +284,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
theCredits.acquireCredits(creditSize); theCredits.acquireCredits(creditSize);
session.checkDefaultAddress(sendingAddress);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
} }

View File

@ -135,8 +135,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private volatile boolean mayAttemptToFailover = true; private volatile boolean mayAttemptToFailover = true;
private volatile SimpleString defaultAddress;
/** /**
* Current XID. this will be used in case of failover * Current XID. this will be used in case of failover
*/ */
@ -957,7 +955,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// want // want
// to recreate the session, we just want to unblock the blocking call // to recreate the session, we just want to unblock the blocking call
if (!inClose && mayAttemptToFailover) { if (!inClose && mayAttemptToFailover) {
sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) { for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) {
@ -1036,27 +1034,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override @Override
public void setAddress(final Message message, final SimpleString address) { public void setAddress(final Message message, final SimpleString address) {
if (defaultAddress == null) { logger.tracef("setAddress() Setting default address as %s", address);
logger.tracef("setAddress() Setting default address as %s", address);
message.setAddress(address); message.setAddress(address);
} else {
if (!address.equals(defaultAddress)) {
logger.tracef("setAddress() setting non default address %s on message", address);
message.setAddress(address);
} else {
logger.trace("setAddress() being set as null");
message.setAddress(null);
}
}
}
@Override
public void checkDefaultAddress(SimpleString address) {
if (defaultAddress == null) {
logger.tracef("checkDefaultAddress(%s)", address);
defaultAddress = address;
}
} }
@Override @Override

View File

@ -93,8 +93,6 @@ public interface ClientSessionInternal extends ClientSession {
*/ */
void setAddress(Message message, SimpleString address); void setAddress(Message message, SimpleString address);
void checkDefaultAddress(SimpleString address);
void setPacketSize(int packetSize); void setPacketSize(int packetSize);
void resetIfNeeded() throws ActiveMQException; void resetIfNeeded() throws ActiveMQException;

View File

@ -629,9 +629,8 @@ public class ActiveMQSessionContext extends SessionContext {
final boolean xa, final boolean xa,
final boolean autoCommitSends, final boolean autoCommitSends,
final boolean autoCommitAcks, final boolean autoCommitAcks,
final boolean preAcknowledge, final boolean preAcknowledge) throws ActiveMQException {
final SimpleString defaultAddress) throws ActiveMQException { Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
boolean retry; boolean retry;
do { do {
try { try {
@ -662,9 +661,8 @@ public class ActiveMQSessionContext extends SessionContext {
boolean xa, boolean xa,
boolean autoCommitSends, boolean autoCommitSends,
boolean autoCommitAcks, boolean autoCommitAcks,
boolean preAcknowledge, boolean preAcknowledge) {
SimpleString defaultAddress) { return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
} }
@Override @Override

View File

@ -250,8 +250,7 @@ public abstract class SessionContext {
final boolean xa, final boolean xa,
final boolean autoCommitSends, final boolean autoCommitSends,
final boolean autoCommitAcks, final boolean autoCommitAcks,
final boolean preAcknowledge, final boolean preAcknowledge) throws ActiveMQException;
final SimpleString defaultAddress) throws ActiveMQException;
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException; public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;

View File

@ -63,9 +63,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
boolean xa, boolean xa,
boolean autoCommitSends, boolean autoCommitSends,
boolean autoCommitAcks, boolean autoCommitAcks,
boolean preAcknowledge, boolean preAcknowledge) {
SimpleString defaultAddress) { return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), null);
return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
} }
@Override @Override

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@ -92,10 +93,13 @@ public class PagingLeakTest extends ActiveMQTestBase {
positions.clear(); positions.clear();
timeout = System.currentTimeMillis() + 5000; Wait.waitFor(new Wait.Condition() {
while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) { @Override
forceGC(); public boolean isSatisfied() throws Exception {
} forceGC();
return pagePosInstances.get() == 0;
}
}, 5000, 100);
// This is just to validate the rules are correctly applied on byteman // This is just to validate the rules are correctly applied on byteman
assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get()); assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get());
@ -110,7 +114,7 @@ public class PagingLeakTest extends ActiveMQTestBase {
server.start(); server.start();
AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(10 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getAddressSettingsRepository().addMatch("#", settings); server.getAddressSettingsRepository().addMatch("#", settings);

View File

@ -104,12 +104,12 @@ public class ProducerTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer(); ClientProducer producer = session.createProducer();
for (int i = 0; i < 62; i++) { for (int i = 0; i < 62; i++) {
if (i == 61) { if (i == 30) {
// the point where the send would block // the point where the send would block
latch.countDown(); latch.countDown();
} }
ClientMessage msg = session.createMessage(false); ClientMessage msg = session.createMessage(false);
msg.getBodyBuffer().writeBytes(new byte[1024]); msg.getBodyBuffer().writeBytes(new byte[2048]);
producer.send(QUEUE, msg); producer.send(QUEUE, msg);
} }
} catch (Exception e) { } catch (Exception e) {
@ -119,7 +119,7 @@ public class ProducerTest extends ActiveMQTestBase {
}; };
t.start(); t.start();
assertTrue(latch.await(5, TimeUnit.SECONDS)); assertTrue(latch.await(10, TimeUnit.SECONDS));
session.close(); session.close();
t.join(5000); t.join(5000);

View File

@ -93,7 +93,7 @@ public class BackupSyncJournalTest extends FailoverTestBase {
@Test @Test
public void testReserveFileIdValuesOnBackup() throws Exception { public void testReserveFileIdValuesOnBackup() throws Exception {
final int totalRounds = 50; final int totalRounds = 5;
createProducerSendSomeMessages(); createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer); JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < totalRounds; i++) { for (int i = 0; i < totalRounds; i++) {

View File

@ -730,7 +730,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
@Test @Test
public void testCompactAddAndUpdateFollowedByADelete() throws Exception { public void testCompactAddAndUpdateFollowedByADelete() throws Exception {
setup(2, 60 * 1024, false); setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000); SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
@ -779,7 +778,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
createJournal(); createJournal();
startJournal(); startJournal();
loadAndCheck(); loadAndCheck();
} }
@Test @Test
@ -1610,8 +1608,9 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
} }
@Test @Test
public void testStressDeletesNoSync() throws Exception { public void testStressDeletesNoSync() throws Throwable {
Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0); Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0);
final AtomicInteger errors = new AtomicInteger(0); final AtomicInteger errors = new AtomicInteger(0);
@ -1629,114 +1628,129 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final JournalStorageManager storage = new JournalStorageManager(config, factory); final JournalStorageManager storage = new JournalStorageManager(config, factory);
storage.start(); storage.start();
storage.loadInternalOnly();
((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false); try {
final LinkedList<Long> survivingMsgs = new LinkedList<>(); storage.loadInternalOnly();
Runnable producerRunnable = new Runnable() { ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false);
@Override final LinkedList<Long> survivingMsgs = new LinkedList<>();
public void run() {
try {
while (running.get()) {
final long[] values = new long[100];
long tx = seqGenerator.incrementAndGet();
OperationContextImpl ctx = new OperationContextImpl(executor); Runnable producerRunnable = new Runnable() {
storage.setContext(ctx); @Override
public void run() {
try {
while (running.get()) {
final long[] values = new long[100];
long tx = seqGenerator.incrementAndGet();
for (int i = 0; i < 100; i++) { OperationContextImpl ctx = new OperationContextImpl(executor);
long id = seqGenerator.incrementAndGet(); storage.setContext(ctx);
values[i] = id;
ServerMessageImpl message = new ServerMessageImpl(id, 100); for (int i = 0; i < 100; i++) {
long id = seqGenerator.incrementAndGet();
values[i] = id;
message.getBodyBuffer().writeBytes(new byte[1024]); ServerMessageImpl message = new ServerMessageImpl(id, 100);
storage.storeMessageTransactional(tx, message); message.getBodyBuffer().writeBytes(new byte[1024]);
}
ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
survivingMsgs.add(message.getMessageID()); storage.storeMessageTransactional(tx, message);
// This one will stay here forever
storage.storeMessage(message);
storage.commit(tx);
ctx.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
} }
ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
@Override survivingMsgs.add(message.getMessageID());
public void done() {
deleteExecutor.execute(new Runnable() { // This one will stay here forever
@Override storage.storeMessage(message);
public void run() {
try { storage.commit(tx);
for (long messageID : values) {
storage.deleteMessage(messageID); ctx.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
deleteExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (long messageID : values) {
storage.deleteMessage(messageID);
}
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
} }
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
} }
});
}
});
} }
}); } catch (Throwable e) {
} e.printStackTrace();
}); errors.incrementAndGet();
} }
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} }
} };
};
Runnable compressRunnable = new Runnable() { Runnable compressRunnable = new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
while (running.get()) { while (running.get()) {
Thread.sleep(500); Thread.sleep(500);
System.out.println("Compacting"); System.out.println("Compacting");
((JournalImpl) storage.getMessageJournal()).testCompact(); ((JournalImpl) storage.getMessageJournal()).testCompact();
((JournalImpl) storage.getMessageJournal()).checkReclaimStatus(); ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} }
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} }
};
Thread producerThread = new Thread(producerRunnable);
producerThread.start();
Thread compactorThread = new Thread(compressRunnable);
compactorThread.start();
Thread.sleep(1000);
running.set(false);
producerThread.join();
compactorThread.join();
deleteExecutor.shutdown();
assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
executor.shutdown();
assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
storage.stop();
} catch (Exception e) {
e.printStackTrace();
} }
};
Thread producerThread = new Thread(producerRunnable); executor.shutdownNow();
producerThread.start(); deleteExecutor.shutdownNow();
}
Thread compactorThread = new Thread(compressRunnable);
compactorThread.start();
Thread.sleep(1000);
running.set(false);
producerThread.join();
compactorThread.join();
storage.stop();
executor.shutdown();
assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
deleteExecutor.shutdown();
assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
} }
@Override @Override

View File

@ -144,18 +144,21 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir); JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir);
journal.start(); journal.start();
Loader loadTest = new Loader(numberOfRecords); try {
journal.load(loadTest); Loader loadTest = new Loader(numberOfRecords);
Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds); journal.load(loadTest);
Assert.assertEquals(0, loadTest.numberOfPreparedTransactions); Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
Assert.assertEquals(0, loadTest.numberOfUpdates); Assert.assertEquals(0, loadTest.numberOfPreparedTransactions);
Assert.assertEquals(0, loadTest.numberOfDeletes); Assert.assertEquals(0, loadTest.numberOfUpdates);
Assert.assertEquals(0, loadTest.numberOfDeletes);
journal.stop(); if (loadTest.ex != null) {
throw loadTest.ex;
if (loadTest.ex != null) { }
throw loadTest.ex; } finally {
journal.stop();
} }
} }
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------

View File

@ -3335,7 +3335,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = null; ClientMessage message = null;
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024]; byte[] body = new byte[2048];
message = session.createMessage(true); message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body); message.getBodyBuffer().writeBytes(body);
@ -3360,7 +3360,7 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize());
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024]; byte[] body = new byte[2048];
message = session.createMessage(true); message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body); message.getBodyBuffer().writeBytes(body);
@ -3385,7 +3385,7 @@ public class PagingTest extends ActiveMQTestBase {
producer = session.createProducer(PagingTest.ADDRESS); producer = session.createProducer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024]; byte[] body = new byte[2048];
message = session.createMessage(true); message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body); message.getBodyBuffer().writeBytes(body);
@ -3841,7 +3841,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024); Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024);
server = createServer(true, config, 512 * 1024, 1024 * 1024); server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2);
server.start(); server.start();
@ -4745,7 +4745,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true); ClientMessage message = session.createMessage(true);
int biggerMessageSize = 1024; int biggerMessageSize = 2048;
byte[] body = new byte[biggerMessageSize]; byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body); ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) { for (int j = 1; j <= biggerMessageSize; j++) {
@ -4817,7 +4817,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true); ClientMessage message = session.createMessage(true);
int biggerMessageSize = 1024; int biggerMessageSize = 2048;
byte[] body = new byte[biggerMessageSize]; byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body); ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) { for (int j = 1; j <= biggerMessageSize; j++) {