This closes #291

This commit is contained in:
Clebert Suconic 2016-01-05 09:14:57 -05:00
commit 86934c91e6
51 changed files with 821 additions and 574 deletions

8
.gitignore vendored
View File

@ -11,10 +11,10 @@ ratReport.txt
.factorypath .factorypath
# for native build # for native build
**/CMakeCache.txt CMakeCache.txt
**/CMakeFiles/ CMakeFiles/
**/Makefile Makefile
**/cmake_install.cmake cmake_install.cmake
# this file is generated # this file is generated
artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h

View File

@ -19,9 +19,12 @@ package org.apache.activemq.artemis.utils;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
public class ByteUtil { public class ByteUtil {
public static final String NON_ASCII_STRING = "@@@@@";
private static final char[] hexArray = "0123456789ABCDEF".toCharArray(); private static final char[] hexArray = "0123456789ABCDEF".toCharArray();
public static String maxString(String value, int size) { public static String maxString(String value, int size) {
@ -34,22 +37,30 @@ public class ByteUtil {
} }
public static String bytesToHex(byte[] bytes, int groupSize) { public static String bytesToHex(byte[] bytes, int groupSize) {
if (bytes == null) { char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
return "null"; int outPos = 0;
} for (int j = 0; j < bytes.length; j++) {
else { if (j > 0 && j % groupSize == 0) {
char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; hexChars[outPos++] = ' ';
int outPos = 0;
for (int j = 0; j < bytes.length; j++) {
if (j > 0 && j % groupSize == 0) {
hexChars[outPos++] = ' ';
}
int v = bytes[j] & 0xFF;
hexChars[outPos++] = hexArray[v >>> 4];
hexChars[outPos++] = hexArray[v & 0x0F];
} }
return new String(hexChars); int v = bytes[j] & 0xFF;
hexChars[outPos++] = hexArray[v >>> 4];
hexChars[outPos++] = hexArray[v & 0x0F];
} }
return new String(hexChars);
}
public static String toSimpleString(byte[] bytes) {
SimpleString simpleString = new SimpleString(bytes);
String value = simpleString.toString();
for (char c : value.toCharArray()) {
if (c < ' ' || c > 127) {
return NON_ASCII_STRING;
}
}
return value;
} }
private static int numberOfGroups(byte[] bytes, int groupSize) { private static int numberOfGroups(byte[] bytes, int groupSize) {

View File

@ -30,6 +30,14 @@ public class ByteUtilTest {
testEquals("000102 03", ByteUtil.bytesToHex(byteArray, 3)); testEquals("000102 03", ByteUtil.bytesToHex(byteArray, 3));
} }
@Test
public void testNonASCII() {
Assert.assertEquals("aA", ByteUtil.toSimpleString(new byte[]{97, 0, 65, 0}));
Assert.assertEquals(ByteUtil.NON_ASCII_STRING, ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65}));
System.out.println(ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65}));
}
@Test @Test
public void testMaxString() { public void testMaxString() {
byte[] byteArray = new byte[20 * 1024]; byte[] byteArray = new byte[20 * 1024];

View File

@ -536,6 +536,10 @@ public final class ChannelImpl implements Channel {
if (resendCache != null && packet.isRequiresConfirmations()) { if (resendCache != null && packet.isRequiresConfirmations()) {
lastConfirmedCommandID.incrementAndGet(); lastConfirmedCommandID.incrementAndGet();
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
}
receivedBytes += packet.getPacketSize(); receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize) { if (receivedBytes >= confWindowSize) {

View File

@ -56,6 +56,21 @@
<exclude>artemis_doap.rdf</exclude> <exclude>artemis_doap.rdf</exclude>
<exclude>artemis-native/bin/</exclude> <exclude>artemis-native/bin/</exclude>
<!-- Files generated from automake -->
<exclude>CMakeCache.txt</exclude>
<exclude>CMakeFiles/</exclude>
<exclude>Makefile</exclude>
<exclude>artemis-native/CMakeCache.txt</exclude>
<exclude>artemis-native/CMakeFiles/</exclude>
<exclude>artemis-native/Makefile</exclude>
<exclude>artemis-native/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/CMakeFiles/</exclude>
<exclude>artemis-native/src/main/c/Makefile</exclude>
<exclude>artemis-native/src/main/c/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>cmake_install.cmake</exclude>
<!-- build output --> <!-- build output -->
<exclude> <exclude>
%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/).*${project.build.directory}.*] %regex[(?!((?!${project.build.directory}/)[^/]+/)*src/).*${project.build.directory}.*]

View File

@ -21,5 +21,5 @@ package org.apache.activemq.artemis.core.io;
*/ */
public interface IOCriticalErrorListener { public interface IOCriticalErrorListener {
void onIOException(Exception code, String message, SequentialFile file); void onIOException(Throwable code, String message, SequentialFile file);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@ -125,24 +126,30 @@ public class AMQServerConsumer extends ServerConsumerImpl {
} }
public void amqPutBackToDeliveringList(final List<MessageReference> refs) { public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
synchronized (this.deliveringRefs) { try {
for (MessageReference ref : refs) { synchronized (this.deliveringRefs) {
ref.incrementDeliveryCount(); for (MessageReference ref : refs) {
deliveringRefs.add(ref); ref.incrementDeliveryCount();
} deliveringRefs.add(ref);
//adjust the order. Suppose deliveringRefs has 2 existing }
//refs m1, m2, and refs has 3 m3, m4, m5 //adjust the order. Suppose deliveringRefs has 2 existing
//new order must be m3, m4, m5, m1, m2 //refs m1, m2, and refs has 3 m3, m4, m5
if (refs.size() > 0) { //new order must be m3, m4, m5, m1, m2
long first = refs.get(0).getMessage().getMessageID(); if (refs.size() > 0) {
MessageReference m = deliveringRefs.peek(); long first = refs.get(0).getMessage().getMessageID();
while (m.getMessage().getMessageID() != first) { MessageReference m = deliveringRefs.peek();
deliveringRefs.poll(); while (m.getMessage().getMessageID() != first) {
deliveringRefs.add(m); deliveringRefs.poll();
m = deliveringRefs.peek(); deliveringRefs.add(m);
m = deliveringRefs.peek();
}
} }
} }
} }
catch (ActiveMQException e) {
// TODO: what to do here?
throw new IllegalStateException(e.getMessage(), e);
}
} }
public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception { public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception {

View File

@ -339,7 +339,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
* @param refs * @param refs
* @return * @return
*/ */
private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) { private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) throws ActiveMQException {
Map<String, Object>[] messages = new Map[refs.size()]; Map<String, Object>[] messages = new Map[refs.size()];
int i = 0; int i = 0;
for (MessageReference ref : refs) { for (MessageReference ref : refs) {
@ -350,7 +350,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
@Override @Override
public Map<String, Map<String, Object>[]> listDeliveringMessages() { public Map<String, Map<String, Object>[]> listDeliveringMessages() throws ActiveMQException {
checkStarted(); checkStarted();
clearIO(); clearIO();

View File

@ -47,6 +47,8 @@ public interface PagingStore extends ActiveMQComponent {
int getNumberOfPages(); int getNumberOfPages();
void criticalException(Throwable e);
/** /**
* Returns the page id of the current page in which the system is writing files. * Returns the page id of the current page in which the system is writing files.
*/ */

View File

@ -38,4 +38,6 @@ public interface PagingStoreFactory {
SequentialFileFactory newFileFactory(SimpleString address) throws Exception; SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
void criticalException(Throwable e);
} }

View File

@ -41,17 +41,6 @@ public interface PageCache extends SoftValueHashMap.ValueCache {
*/ */
PagedMessage getMessage(int messageNumber); PagedMessage getMessage(int messageNumber);
/**
* When the cache is being created,
* We need to first read the files before other threads can get messages from this.
*/
void lock();
/**
* You have to call this method within the same thread you called lock
*/
void unlock();
void close(); void close();
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -24,7 +25,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
*/ */
public interface PageCursorProvider { public interface PageCursorProvider {
PageCache getPageCache(long pageNr); PageCache getPageCache(long pageNr) throws ActiveMQException;
PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub); PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
@ -38,7 +39,7 @@ public interface PageCursorProvider {
PageSubscription createSubscription(long queueId, Filter filter, boolean durable); PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
PagedMessage getMessage(PagePosition pos); PagedMessage getMessage(PagePosition pos) throws ActiveMQException;
void processReload() throws Exception; void processReload() throws Exception;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
@ -95,7 +96,7 @@ public interface PageSubscription {
void reloadPageCompletion(PagePosition position); void reloadPageCompletion(PagePosition position);
void reloadPageInfo(long pageNr); void reloadPageInfo(long pageNr) throws ActiveMQException;
/** /**
* To be called when the cursor decided to ignore a position. * To be called when the cursor decided to ignore a position.
@ -147,7 +148,7 @@ public interface PageSubscription {
* @param pos * @param pos
* @return * @return
*/ */
PagedMessage queryMessage(PagePosition pos); PagedMessage queryMessage(PagePosition pos) throws ActiveMQException;
/** /**
* @return executor used by the PageSubscription * @return executor used by the PageSubscription

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -23,5 +24,5 @@ public interface PagedReference extends MessageReference {
PagePosition getPosition(); PagePosition getPosition();
PagedMessage getPagedMessage(); PagedMessage getPagedMessage() throws ActiveMQException;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -49,12 +50,12 @@ public class PagedReferenceImpl implements PagedReference {
private boolean alreadyAcked; private boolean alreadyAcked;
@Override @Override
public ServerMessage getMessage() { public ServerMessage getMessage() throws ActiveMQException {
return getPagedMessage().getMessage(); return getPagedMessage().getMessage();
} }
@Override @Override
public synchronized PagedMessage getPagedMessage() { public synchronized PagedMessage getPagedMessage() throws ActiveMQException {
PagedMessage returnMessage = message != null ? message.get() : null; PagedMessage returnMessage = message != null ? message.get() : null;
// We only keep a few references on the Queue from paging... // We only keep a few references on the Queue from paging...
@ -107,25 +108,42 @@ public class PagedReferenceImpl implements PagedReference {
@Override @Override
public int getMessageMemoryEstimate() { public int getMessageMemoryEstimate() {
if (messageEstimate < 0) { if (messageEstimate < 0) {
messageEstimate = getMessage().getMemoryEstimate(); try {
messageEstimate = getMessage().getMemoryEstimate();
}
catch (ActiveMQException e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
} }
return messageEstimate; return messageEstimate;
} }
@Override @Override
public MessageReference copy(final Queue queue) { public MessageReference copy(final Queue queue) {
return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); try {
return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
}
catch (ActiveMQException e) {
ActiveMQServerLogger.LOGGER.warn(e);
return this;
}
} }
@Override @Override
public long getScheduledDeliveryTime() { public long getScheduledDeliveryTime() {
if (deliveryTime == null) { if (deliveryTime == null) {
ServerMessage msg = getMessage(); try {
if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { ServerMessage msg = getMessage();
deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
}
else {
deliveryTime = 0L;
}
} }
else { catch (ActiveMQException e) {
deliveryTime = 0L; ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return 0L;
} }
} }
return deliveryTime; return deliveryTime;

View File

@ -67,16 +67,6 @@ public class LivePageCacheImpl implements LivePageCache {
} }
} }
@Override
public void lock() {
// nothing to be done on live cache
}
@Override
public void unlock() {
// nothing to be done on live cache
}
@Override @Override
public synchronized boolean isLive() { public synchronized boolean isLive() {
return isLive; return isLive;

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor.impl; package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
@ -32,8 +29,6 @@ class PageCacheImpl implements PageCache {
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private PagedMessage[] messages; private PagedMessage[] messages;
private final Page page; private final Page page;
@ -50,17 +45,11 @@ class PageCacheImpl implements PageCache {
@Override @Override
public PagedMessage getMessage(final int messageNumber) { public PagedMessage getMessage(final int messageNumber) {
lock.readLock().lock(); if (messageNumber < messages.length) {
try { return messages[messageNumber];
if (messageNumber < messages.length) {
return messages[messageNumber];
}
else {
return null;
}
} }
finally { else {
lock.readLock().unlock(); return null;
} }
} }
@ -69,16 +58,6 @@ class PageCacheImpl implements PageCache {
return page.getPageId(); return page.getPageId();
} }
@Override
public void lock() {
lock.writeLock().lock();
}
@Override
public void unlock() {
lock.writeLock().unlock();
}
@Override @Override
public void setMessages(final PagedMessage[] messages) { public void setMessages(final PagedMessage[] messages) {
this.messages = messages; this.messages = messages;
@ -86,13 +65,7 @@ class PageCacheImpl implements PageCache {
@Override @Override
public int getNumberOfMessages() { public int getNumberOfMessages() {
lock.readLock().lock(); return messages.length;
try {
return messages.length;
}
finally {
lock.readLock().unlock();
}
} }
@Override @Override

View File

@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -109,7 +111,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public PagedMessage getMessage(final PagePosition pos) { public PagedMessage getMessage(final PagePosition pos) throws ActiveMQException {
PageCache cache = getPageCache(pos.getPageNr()); PageCache cache = getPageCache(pos.getPageNr());
if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) { if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
@ -128,10 +130,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public PageCache getPageCache(final long pageId) { public PageCache getPageCache(final long pageId) throws ActiveMQException {
try { try {
boolean needToRead = false; PageCache cache;
PageCache cache = null;
synchronized (softCache) { synchronized (softCache) {
if (pageId > pagingStore.getCurrentWritingPage()) { if (pageId > pagingStore.getCurrentWritingPage()) {
return null; return null;
@ -144,47 +145,43 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
cache = createPageCache(pageId); cache = createPageCache(pageId);
needToRead = true;
// anyone reading from this cache will have to wait reading to finish first // anyone reading from this cache will have to wait reading to finish first
// we also want only one thread reading this cache // we also want only one thread reading this cache
cache.lock();
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress()); ActiveMQServerLogger.LOGGER.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
} }
readPage((int) pageId, cache);
softCache.put(pageId, cache); softCache.put(pageId, cache);
} }
} }
// Reading is done outside of the synchronized block, however
// the page stays locked until the entire reading is finished
if (needToRead) {
Page page = null;
try {
page = pagingStore.createPage((int) pageId);
storageManager.beforePageRead();
page.open();
List<PagedMessage> pgdMessages = page.read(storageManager);
cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
}
finally {
try {
if (page != null) {
page.close();
}
}
catch (Throwable ignored) {
}
storageManager.afterPageRead();
cache.unlock();
}
}
return cache; return cache;
} }
catch (Exception e) { catch (Throwable e) {
throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e); throw new ActiveMQIOErrorException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
}
}
private void readPage(int pageId, PageCache cache) throws Exception {
Page page = null;
try {
page = pagingStore.createPage(pageId);
storageManager.beforePageRead();
page.open();
List<PagedMessage> pgdMessages = page.read(storageManager);
cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
}
finally {
try {
if (page != null) {
page.close();
}
}
catch (Throwable ignored) {
}
storageManager.afterPageRead();
} }
} }

View File

@ -32,8 +32,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -332,7 +333,7 @@ final class PageSubscriptionImpl implements PageSubscription {
return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]"; return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
} }
private PagedReference getReference(PagePosition pos) { private PagedReference getReference(PagePosition pos) throws ActiveMQException {
return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this); return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
} }
@ -341,7 +342,7 @@ final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator(); return new CursorIterator();
} }
private PagedReference internalGetNext(final PagePosition pos) { private PagedReference internalGetNext(final PagePosition pos) throws ActiveMQException {
PagePosition retPos = pos.nextMessage(); PagePosition retPos = pos.nextMessage();
PageCache cache = cursorProvider.getPageCache(pos.getPageNr()); PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
@ -470,11 +471,17 @@ final class PageSubscriptionImpl implements PageSubscription {
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
error = " errorCode=" + errorCode + ", msg=" + errorMessage; error = " errorCode=" + errorCode + ", msg=" + errorMessage;
ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error); ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error);
getPagingStore().criticalException(new ActiveMQException(errorMessage));
} }
@Override @Override
public void done() { public void done() {
processACK(position); try {
processACK(position);
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
} }
@Override @Override
@ -504,7 +511,12 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override @Override
public void addPendingDelivery(final PagePosition position) { public void addPendingDelivery(final PagePosition position) {
getPageInfo(position).incrementPendingTX(); try {
getPageInfo(position).incrementPendingTX();
}
catch (Exception e) {
getPagingStore().criticalException(e);
}
} }
@Override @Override
@ -523,13 +535,8 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
@Override @Override
public PagedMessage queryMessage(PagePosition pos) { public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException {
try { return cursorProvider.getMessage(pos);
return cursorProvider.getMessage(pos);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
} }
/** /**
@ -547,18 +554,32 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override @Override
public void reloadPreparedACK(final Transaction tx, final PagePosition position) { public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
deliveredCount.incrementAndGet(); deliveredCount.incrementAndGet();
installTXCallback(tx, position); try {
installTXCallback(tx, position);
}
catch (Exception e) {
getPagingStore().criticalException(e);
}
} }
@Override @Override
public void positionIgnored(final PagePosition position) { public void positionIgnored(final PagePosition position) {
processACK(position); try {
processACK(position);
}
catch (Exception e) {
getPagingStore().criticalException(e);
}
} }
@Override
public void lateDeliveryRollback(PagePosition position) { public void lateDeliveryRollback(PagePosition position) {
PageCursorInfo cursorInfo = processACK(position); try {
cursorInfo.decrementPendingTX(); PageCursorInfo cursorInfo = processACK(position);
cursorInfo.decrementPendingTX();
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
} }
@Override @Override
@ -729,15 +750,15 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
@Override @Override
public void reloadPageInfo(long pageNr) { public void reloadPageInfo(long pageNr) throws ActiveMQException {
getPageInfo(pageNr, true); getPageInfo(pageNr, true);
} }
private PageCursorInfo getPageInfo(final PagePosition pos) { private PageCursorInfo getPageInfo(final PagePosition pos) throws ActiveMQException {
return getPageInfo(pos.getPageNr(), true); return getPageInfo(pos.getPageNr(), true);
} }
private PageCursorInfo getPageInfo(final long pageNr, boolean create) { private PageCursorInfo getPageInfo(final long pageNr, boolean create) throws ActiveMQException {
synchronized (consumedPages) { synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(pageNr); PageCursorInfo pageInfo = consumedPages.get(pageNr);
@ -771,7 +792,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// To be called only after the ACK has been processed and guaranteed to be on storage // To be called only after the ACK has been processed and guaranteed to be on storage
// The only exception is on non storage events such as not matching messages // The only exception is on non storage events such as not matching messages
private PageCursorInfo processACK(final PagePosition pos) { private PageCursorInfo processACK(final PagePosition pos) throws ActiveMQException {
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) { if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) {
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK"); ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK");
@ -807,7 +828,7 @@ final class PageSubscriptionImpl implements PageSubscription {
* @param tx * @param tx
* @param position * @param position
*/ */
private void installTXCallback(final Transaction tx, final PagePosition position) { private void installTXCallback(final Transaction tx, final PagePosition position) throws ActiveMQException {
if (position.getRecordID() >= 0) { if (position.getRecordID() >= 0) {
// It needs to persist, otherwise the cursor will return to the fist page position // It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent(); tx.setContainsPersistent();
@ -827,7 +848,7 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
private PageTransactionInfo getPageTransaction(final PagedReference reference) { private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
if (reference.getPagedMessage().getTransactionID() >= 0) { if (reference.getPagedMessage().getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID()); return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
} }
@ -895,13 +916,24 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override @Override
public String toString() { public String toString() {
return "PageCursorInfo::PageID=" + pageId + try {
" numberOfMessage = " + return "PageCursorInfo::PageID=" + pageId +
numberOfMessages + " numberOfMessage = " +
", confirmed = " + numberOfMessages +
confirmed + ", confirmed = " +
", isDone=" + confirmed +
this.isDone(); ", isDone=" +
this.isDone();
}
catch (Exception e) {
return "PageCursorInfo::PageID=" + pageId +
" numberOfMessage = " +
numberOfMessages +
", confirmed = " +
confirmed +
", isDone=" +
e.toString();
}
} }
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) { public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
@ -928,7 +960,13 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
public boolean isDone() { public boolean isDone() {
return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); try {
return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
throw new RuntimeException(e.getMessage(), e);
}
} }
public boolean isPendingDelete() { public boolean isPendingDelete() {
@ -966,12 +1004,17 @@ final class PageSubscriptionImpl implements PageSubscription {
public void addACK(final PagePosition posACK) { public void addACK(final PagePosition posACK) {
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("numberOfMessages = " + getNumberOfMessages() + try {
" confirmed = " + ActiveMQServerLogger.LOGGER.trace("numberOfMessages = " + getNumberOfMessages() +
(confirmed.get() + 1) + " confirmed = " +
" pendingTX = " + pendingTX + (confirmed.get() + 1) +
", page = " + " pendingTX = " + pendingTX +
pageId + " posACK = " + posACK); ", page = " +
pageId + " posACK = " + posACK);
}
catch (Throwable ignored) {
ActiveMQServerLogger.LOGGER.debug(ignored.getMessage(), ignored);
}
} }
boolean added = internalAddACK(posACK); boolean added = internalAddACK(posACK);
@ -1004,7 +1047,7 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
} }
private int getNumberOfMessages() { private int getNumberOfMessages() throws ActiveMQException {
if (wasLive) { if (wasLive) {
// if the page was live at any point, we need to // if the page was live at any point, we need to
// get the number of messages from the page-cache // get the number of messages from the page-cache
@ -1023,7 +1066,7 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
private static final class PageCursorTX extends TransactionOperationAbstract { private final class PageCursorTX extends TransactionOperationAbstract {
private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>(); private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>();
@ -1046,7 +1089,12 @@ final class PageSubscriptionImpl implements PageSubscription {
List<PagePosition> positions = entry.getValue(); List<PagePosition> positions = entry.getValue();
for (PagePosition confirmed : positions) { for (PagePosition confirmed : positions) {
cursor.processACK(confirmed); try {
cursor.processACK(confirmed);
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
cursor.deliveredCount.decrementAndGet(); cursor.deliveredCount.decrementAndGet();
} }
@ -1125,13 +1173,13 @@ final class PageSubscriptionImpl implements PageSubscription {
currentDelivery = moveNext(); currentDelivery = moveNext();
return currentDelivery; return currentDelivery;
} }
catch (RuntimeException e) { catch (ActiveMQException e) {
e.printStackTrace(); getPagingStore().criticalException(e);
throw e; throw new IllegalStateException(e.getMessage(), e);
} }
} }
private PagedReference moveNext() { private PagedReference moveNext() throws ActiveMQException {
synchronized (PageSubscriptionImpl.this) { synchronized (PageSubscriptionImpl.this) {
boolean match = false; boolean match = false;
@ -1261,9 +1309,14 @@ final class PageSubscriptionImpl implements PageSubscription {
deliveredCount.incrementAndGet(); deliveredCount.incrementAndGet();
PagedReference delivery = currentDelivery; PagedReference delivery = currentDelivery;
if (delivery != null) { if (delivery != null) {
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPosition()); try {
if (info != null) { PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
info.remove(delivery.getPosition()); if (info != null) {
info.remove(currentDelivery.getPosition());
}
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
} }
} }
} }

View File

@ -87,6 +87,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
// Public -------------------------------------------------------- // Public --------------------------------------------------------
public void criticalException(Throwable e) {
critialErrorListener.onIOException(e, e.getMessage(), null);
}
@Override @Override
public void stop() { public void stop() {
} }

View File

@ -176,6 +176,12 @@ public class PagingStoreImpl implements PagingStore {
} }
@Override
public void criticalException(Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
storeFactory.criticalException(e);
}
/** /**
* @param addressSettings * @param addressSettings
*/ */

View File

@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.IDGenerator;
*/ */
public interface StorageManager extends IDGenerator, ActiveMQComponent { public interface StorageManager extends IDGenerator, ActiveMQComponent {
void criticalError(Throwable error);
/** /**
* Get the context associated with the thread for later reuse * Get the context associated with the thread for later reuse
*/ */

View File

@ -54,22 +54,22 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -111,11 +111,11 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.XidCodecSupport; import org.apache.activemq.artemis.utils.XidCodecSupport;
@ -204,6 +204,8 @@ public class JournalStorageManager implements StorageManager {
private boolean journalLoaded = false; private boolean journalLoaded = false;
private final IOCriticalErrorListener ioCriticalErrorListener;
private final Configuration config; private final Configuration config;
// Persisted core configuration // Persisted core configuration
@ -222,6 +224,8 @@ public class JournalStorageManager implements StorageManager {
final IOCriticalErrorListener criticalErrorListener) { final IOCriticalErrorListener criticalErrorListener) {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.ioCriticalErrorListener = criticalErrorListener;
this.config = config; this.config = config;
executor = executorFactory.getExecutor(); executor = executorFactory.getExecutor();
@ -275,6 +279,11 @@ public class JournalStorageManager implements StorageManager {
} }
} }
@Override
public void criticalError(Throwable error) {
ioCriticalErrorListener.onIOException(error, error.getMessage(), null);
}
@Override @Override
public void clearContext() { public void clearContext() {
OperationContextImpl.clearContext(); OperationContextImpl.clearContext();
@ -3031,7 +3040,7 @@ public class JournalStorageManager implements StorageManager {
bridgeRepresentation + "]"; bridgeRepresentation + "]";
} }
else { else {
return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]"; return "DuplicateIDEncoding [address=" + address + ",str=" + ByteUtil.toSimpleString(duplID) + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
} }
} }
} }

View File

@ -28,9 +28,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -62,6 +63,26 @@ public class NullStorageManager implements StorageManager {
private volatile boolean started; private volatile boolean started;
private final IOCriticalErrorListener ioCriticalErrorListener;
public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) {
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
public NullStorageManager() {
this(new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
code.printStackTrace();
}
});
}
@Override
public void criticalError(Throwable error) {
}
private static final OperationContext dummyContext = new OperationContext() { private static final OperationContext dummyContext = new OperationContext() {
@Override @Override

View File

@ -25,6 +25,8 @@ public interface DuplicateIDCache {
boolean contains(byte[] duplicateID); boolean contains(byte[] duplicateID);
boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception;
void addToCache(byte[] duplicateID) throws Exception; void addToCache(byte[] duplicateID) throws Exception;
void addToCache(byte[] duplicateID, Transaction tx) throws Exception; void addToCache(byte[] duplicateID, Transaction tx) throws Exception;

View File

@ -44,7 +44,7 @@ public interface PostOffice extends ActiveMQComponent {
void addBinding(Binding binding) throws Exception; void addBinding(Binding binding) throws Exception;
Binding removeBinding(SimpleString uniqueName, Transaction tx) throws Exception; Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception;
/** /**
* It will lookup the Binding without creating an item on the Queue if non-existent * It will lookup the Binding without creating an item on the Queue if non-existent

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.ByteUtil;
/** /**
* A DuplicateIDCacheImpl * A DuplicateIDCacheImpl
@ -37,6 +39,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
*/ */
public class DuplicateIDCacheImpl implements DuplicateIDCache { public class DuplicateIDCacheImpl implements DuplicateIDCache {
private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
// ByteHolder, position // ByteHolder, position
private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>(); private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
@ -71,12 +75,27 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
@Override @Override
public void load(final List<Pair<byte[], Long>> theIds) throws Exception { public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
int count = 0;
long txID = -1; long txID = -1;
// If we have more IDs than cache size, we shrink the first ones
int deleteCount = theIds.size() - cacheSize;
if (deleteCount < 0) {
deleteCount = 0;
}
for (Pair<byte[], Long> id : theIds) { for (Pair<byte[], Long> id : theIds) {
if (count < cacheSize) { if (deleteCount > 0) {
if (txID == -1) {
txID = storageManager.generateID();
}
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
}
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
deleteCount--;
}
else {
ByteArrayHolder bah = new ByteArrayHolder(id.getA()); ByteArrayHolder bah = new ByteArrayHolder(id.getA());
Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB()); Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());
@ -84,17 +103,11 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
cache.put(bah, ids.size()); cache.put(bah, ids.size());
ids.add(pair); ids.add(pair);
} if (isTrace) {
else { ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
// cache size has been reduced in config - delete the extra records
if (txID == -1) {
txID = storageManager.generateID();
} }
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
} }
count++;
} }
if (txID != -1) { if (txID != -1) {
@ -111,6 +124,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
@Override @Override
public void deleteFromCache(byte[] duplicateID) throws Exception { public void deleteFromCache(byte[] duplicateID) throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
}
ByteArrayHolder bah = new ByteArrayHolder(duplicateID); ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
Integer posUsed = cache.remove(bah); Integer posUsed = cache.remove(bah);
@ -124,6 +141,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
if (id.getA().equals(bah)) { if (id.getA().equals(bah)) {
id.setA(null); id.setA(null);
storageManager.deleteDuplicateID(id.getB()); storageManager.deleteDuplicateID(id.getB());
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
}
id.setB(null); id.setB(null);
} }
} }
@ -131,9 +151,23 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
} }
private String describeID(byte[] duplicateID, long id) {
if (id != 0) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
}
else {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
}
}
@Override @Override
public boolean contains(final byte[] duplID) { public boolean contains(final byte[] duplID) {
return cache.get(new ByteArrayHolder(duplID)) != null; boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
if (contains) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
}
return contains;
} }
@Override @Override
@ -147,6 +181,21 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
} }
@Override @Override
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
if (contains(duplID)) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
}
return false;
}
else {
addToCache(duplID, tx, true);
return true;
}
}
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
long recordID = -1; long recordID = -1;
@ -170,6 +219,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
addToCacheInMemory(duplID, recordID); addToCacheInMemory(duplID, recordID);
} }
else { else {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID));
}
// For a tx, it's important that the entry is not added to the cache until commit // For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected // since if the client fails then resends them tx we don't want it to get rejected
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
@ -183,6 +235,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
} }
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) { private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
}
ByteArrayHolder holder = new ByteArrayHolder(duplID); ByteArrayHolder holder = new ByteArrayHolder(duplID);
cache.put(holder, pos); cache.put(holder, pos);
@ -195,6 +251,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
// The id here might be null if it was explicit deleted // The id here might be null if it was explicit deleted
if (id.getA() != null) { if (id.getA() != null) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
}
cache.remove(id.getA()); cache.remove(id.getA());
// Record already exists - we delete the old one and add the new one // Record already exists - we delete the old one and add the new one
@ -217,11 +277,19 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
// -1 would mean null on this case // -1 would mean null on this case
id.setB(recordID >= 0 ? recordID : null); id.setB(recordID >= 0 ? recordID : null);
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
}
holder.pos = pos; holder.pos = pos;
} }
else { else {
id = new Pair<>(holder, recordID >= 0 ? recordID : null); id = new Pair<>(holder, recordID >= 0 ? recordID : null);
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
}
ids.add(id); ids.add(id);
holder.pos = pos; holder.pos = pos;
@ -234,6 +302,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
@Override @Override
public void clear() throws Exception { public void clear() throws Exception {
ActiveMQServerLogger.LOGGER.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
synchronized (this) { synchronized (this) {
if (ids.size() > 0) { if (ids.size() > 0) {
long tx = storageManager.generateID(); long tx = storageManager.generateID();

View File

@ -41,8 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.NotificationType; import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -463,7 +463,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
@Override @Override
public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { public synchronized Binding removeBinding(final SimpleString uniqueName,
Transaction tx,
boolean deleteData) throws Exception {
addressSettingsRepository.clearCache(); addressSettingsRepository.clearCache();
@ -473,7 +475,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new ActiveMQNonExistentQueueException(); throw new ActiveMQNonExistentQueueException();
} }
if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress()); pagingManager.deletePageStore(binding.getAddress());
managementService.unregisterAddress(binding.getAddress()); managementService.unregisterAddress(binding.getAddress());
@ -1159,31 +1161,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress())); DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
if (cacheBridge.contains(bridgeDupBytes)) { if (context.getTransaction() == null) {
ActiveMQServerLogger.LOGGER.duplicateMessageDetectedThruBridge(message); context.setTransaction(new TransactionImpl(storageManager));
startedTX.set(true);
if (context.getTransaction() != null) { }
context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException());
}
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
context.getTransaction().rollback();
startedTX.set(false);
message.decrementRefCount(); message.decrementRefCount();
return false; return false;
} }
else {
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
startedTX.set(true);
}
}
// on the bridge case there is a case where the bridge reconnects
// and the send hasn't finished yet (think of CPU outages).
// for that reason we add the cache right away
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true);
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
} }
else { else {
// if used BridgeDuplicate, it's not going to use the regular duplicate // if used BridgeDuplicate, it's not going to use the regular duplicate
@ -1222,7 +1212,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
startedTX.set(true); startedTX.set(true);
} }
cache.addToCache(duplicateIDBytes, context.getTransaction()); cache.addToCache(duplicateIDBytes, context.getTransaction(), false);
} }
} }

View File

@ -16,55 +16,21 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core; package org.apache.activemq.artemis.core.protocol.core;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import java.util.List;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@ -105,14 +71,48 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
public class ServerSessionPacketHandler implements ChannelHandler { public class ServerSessionPacketHandler implements ChannelHandler {
private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -483,6 +483,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
} }
} }
catch (ActiveMQIOErrorException e) {
getSession().markTXFailed(e);
if (requiresResponse) {
ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e);
response = new ActiveMQExceptionMessage(e);
}
else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
}
catch (ActiveMQXAException e) { catch (ActiveMQXAException e) {
if (requiresResponse) { if (requiresResponse) {
ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e); ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e);
@ -507,6 +517,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
} }
catch (Throwable t) { catch (Throwable t) {
getSession().markTXFailed(t);
if (requiresResponse) { if (requiresResponse) {
ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t); ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
@ -611,7 +622,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence()); newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
Connection oldTransportConnection = remotingConnection.getTransportConnection(); Connection oldTransportConnection = remotingConnection.getTransportConnection();
remotingConnection = newConnection; remotingConnection = newConnection;

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
/** /**
* A reference to a message. * A reference to a message.
* *
@ -25,7 +27,7 @@ public interface MessageReference {
boolean isPaged(); boolean isPaged();
ServerMessage getMessage(); ServerMessage getMessage() throws ActiveMQException;
/** /**
* We define this method aggregation here because on paging we need to hold the original estimate, * We define this method aggregation here because on paging we need to hold the original estimate,

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@ -122,7 +123,7 @@ public interface Queue extends Bindable {
MessageReference removeReferenceWithID(long id) throws Exception; MessageReference removeReferenceWithID(long id) throws Exception;
MessageReference getReference(long id); MessageReference getReference(long id) throws ActiveMQException;
int deleteAllReferences() throws Exception; int deleteAllReferences() throws Exception;
@ -236,9 +237,9 @@ public interface Queue extends Bindable {
/** /**
* cancels scheduled messages and send them to the head of the queue. * cancels scheduled messages and send them to the head of the queue.
*/ */
void deliverScheduledMessages(); void deliverScheduledMessages() throws ActiveMQException;
void postAcknowledge(MessageReference ref); void postAcknowledge(MessageReference ref) throws ActiveMQException;
float getRate(); float getRate();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import java.util.List; import java.util.List;
@ -28,7 +29,7 @@ public interface ScheduledDeliveryHandler {
List<MessageReference> getScheduledReferences(); List<MessageReference> getScheduledReferences();
List<MessageReference> cancel(Filter filter); List<MessageReference> cancel(Filter filter) throws ActiveMQException;
MessageReference removeReferenceWithID(long id); MessageReference removeReferenceWithID(long id) throws ActiveMQException;
} }

View File

@ -73,6 +73,8 @@ public interface ServerSession extends SecurityAuth {
void xaSuspend() throws Exception; void xaSuspend() throws Exception;
void markTXFailed(Throwable e);
QueueCreator getQueueCreator(); QueueCreator getQueueCreator();
List<Xid> xaGetInDoubtXids(); List<Xid> xaGetInDoubtXids();

View File

@ -1244,7 +1244,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
throw new IllegalStateException("Cannot find binding for queue " + clusterName); throw new IllegalStateException("Cannot find binding for queue " + clusterName);
} }
postOffice.removeBinding(binding.getUniqueName(), null); postOffice.removeBinding(binding.getUniqueName(), null, false);
} }
private synchronized void resetBinding(final SimpleString clusterName) throws Exception { private synchronized void resetBinding(final SimpleString clusterName) throws Exception {

View File

@ -415,7 +415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
backupActivationThread.start(); backupActivationThread.start();
} }
else { else {
ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "" ); ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "");
} }
// start connector service // start connector service
connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry); connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry);
@ -508,18 +508,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* Stops the server in a different thread. * Stops the server in a different thread.
*/ */
public final void stopTheServer(final boolean criticalIOError) { public final void stopTheServer(final boolean criticalIOError) {
ExecutorService executor = Executors.newSingleThreadExecutor(); Thread thread = new Thread() {
executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
stop(false, criticalIOError, false); ActiveMQServerImpl.this.stop(false, criticalIOError, false);
} }
catch (Exception e) { catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingServer(e); ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
} }
} }
}); };
thread.start();
} }
@Override @Override
@ -722,7 +723,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
pagingManager = null; pagingManager = null;
securityStore = null; securityStore = null;
resourceManager = null; resourceManager = null;
@ -1016,7 +1016,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (securityStore != null) { if (securityStore != null) {
X509Certificate[] certificates = null; X509Certificate[] certificates = null;
if (connection.getTransportConnection() instanceof NettyConnection) { if (connection.getTransportConnection() instanceof NettyConnection) {
certificates = CertificateUtil.getCertsFromChannel(((NettyConnection)connection.getTransportConnection()).getChannel()); certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
} }
securityStore.authenticate(username, password, certificates); securityStore.authenticate(username, password, certificates);
} }
@ -1428,7 +1428,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name); throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name);
} }
postOffice.removeBinding(name, null); postOffice.removeBinding(name, null, true);
} }
@Override @Override
@ -1954,11 +1954,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean failedAlready = false; boolean failedAlready = false;
@Override @Override
public synchronized void onIOException(Exception cause, String message, SequentialFile file) { public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
if (!failedAlready) { if (!failedAlready) {
failedAlready = true; failedAlready = true;
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); if (file == null) {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
}
else {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
}
stopTheServer(true); stopTheServer(true);
} }
@ -2021,10 +2026,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* move any older data away and log a warning about it. * move any older data away and log a warning about it.
*/ */
void moveServerData() { void moveServerData() {
File[] dataDirs = new File[]{configuration.getBindingsLocation(), File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
configuration.getJournalLocation(),
configuration.getPagingLocation(),
configuration.getLargeMessagesLocation()};
boolean allEmpty = true; boolean allEmpty = true;
int lowestSuffixForMovedData = 1; int lowestSuffixForMovedData = 1;

View File

@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -66,7 +67,15 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public synchronized void addTail(final MessageReference ref, final boolean direct) { public synchronized void addTail(final MessageReference ref, final boolean direct) {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); SimpleString prop;
try {
prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
if (prop != null) { if (prop != null) {
HolderReference hr = map.get(prop); HolderReference hr = map.get(prop);
@ -103,45 +112,59 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public synchronized void addHead(final MessageReference ref) { public synchronized void addHead(final MessageReference ref) {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); try {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) { if (prop != null) {
HolderReference hr = map.get(prop); HolderReference hr = map.get(prop);
if (hr != null) { if (hr != null) {
// We keep the current ref and ack the one we are returning // We keep the current ref and ack the one we are returning
super.referenceHandled(); super.referenceHandled();
try { try {
super.acknowledge(ref); super.acknowledge(ref);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
} }
catch (Exception e) { else {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); map.put(prop, (HolderReference) ref);
super.addHead(ref);
} }
} }
else { else {
map.put(prop, (HolderReference) ref);
super.addHead(ref); super.addHead(ref);
} }
} }
else { catch (ActiveMQException e) {
super.addHead(ref); criticalError(e);
throw new IllegalStateException(e);
} }
} }
@Override @Override
protected void refRemoved(MessageReference ref) { protected void refRemoved(MessageReference ref) {
synchronized (this) { try {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) { synchronized (this) {
map.remove(prop); SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) {
map.remove(prop);
}
} }
super.refRemoved(ref);
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
} }
super.refRemoved(ref);
} }
private class HolderReference implements MessageReference { private class HolderReference implements MessageReference {
@ -200,7 +223,13 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public ServerMessage getMessage() { public ServerMessage getMessage() {
return ref.getMessage(); try {
return ref.getMessage();
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
} }
@Override @Override
@ -256,7 +285,13 @@ public class LastValueQueue extends QueueImpl {
*/ */
@Override @Override
public int getMessageMemoryEstimate() { public int getMessageMemoryEstimate() {
return ref.getMessage().getMemoryEstimate(); try {
return ref.getMessage().getMemoryEstimate();
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
} }
/* (non-Javadoc) /* (non-Javadoc)

View File

@ -208,7 +208,7 @@ public class MessageReferenceImpl implements MessageReference {
} }
if (other instanceof MessageReferenceImpl) { if (other instanceof MessageReferenceImpl) {
MessageReference reference = (MessageReferenceImpl) other; MessageReferenceImpl reference = (MessageReferenceImpl) other;
if (this.getMessage().equals(reference.getMessage())) if (this.getMessage().equals(reference.getMessage()))
return true; return true;

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -893,7 +894,7 @@ public class QueueImpl implements Queue {
} }
@Override @Override
public synchronized MessageReference getReference(final long id1) { public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
LinkedListIterator<MessageReference> iterator = iterator(); LinkedListIterator<MessageReference> iterator = iterator();
try { try {
@ -1053,7 +1054,13 @@ public class QueueImpl implements Queue {
@Override @Override
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) { public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); try {
getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
}
catch (ActiveMQException e) {
criticalError(e);
getPageSubscription().getPagingStore().criticalException(e);
}
} }
@Override @Override
@ -1102,7 +1109,7 @@ public class QueueImpl implements Queue {
} }
@Override @Override
public void deliverScheduledMessages() { public void deliverScheduledMessages() throws ActiveMQException {
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null); List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
if (scheduledMessages != null && scheduledMessages.size() > 0) { if (scheduledMessages != null && scheduledMessages.size() > 0) {
for (MessageReference ref : scheduledMessages) { for (MessageReference ref : scheduledMessages) {
@ -1311,7 +1318,7 @@ public class QueueImpl implements Queue {
Transaction tx = new BindingsTransactionImpl(storageManager); Transaction tx = new BindingsTransactionImpl(storageManager);
try { try {
postOffice.removeBinding(name, tx); postOffice.removeBinding(name, tx, true);
deleteAllReferences(); deleteAllReferences();
@ -1770,7 +1777,12 @@ public class QueueImpl implements Queue {
private synchronized void internalAddTail(final MessageReference ref) { private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref); refAdded(ref);
messageReferences.addTail(ref, ref.getMessage().getPriority()); try {
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
catch (ActiveMQException e) {
criticalError(e);
}
} }
/** /**
@ -1781,9 +1793,18 @@ public class QueueImpl implements Queue {
* @param ref * @param ref
*/ */
private void internalAddHead(final MessageReference ref) { private void internalAddHead(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); try {
refAdded(ref); queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
messageReferences.addHead(ref, ref.getMessage().getPriority()); refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
catch (ActiveMQException e) {
criticalError(e);
}
}
void criticalError(ActiveMQException e) {
storageManager.criticalError(e);
} }
private synchronized void doInternalPoll() { private synchronized void doInternalPoll() {
@ -2011,14 +2032,17 @@ public class QueueImpl implements Queue {
return null; return null;
} }
else { else {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever try {
return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
} }
} }
/**
* @param ref
*/
protected void refRemoved(MessageReference ref) { protected void refRemoved(MessageReference ref) {
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate()); queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
if (ref.isPaged()) { if (ref.isPaged()) {
@ -2026,9 +2050,6 @@ public class QueueImpl implements Queue {
} }
} }
/**
* @param ref
*/
protected void refAdded(final MessageReference ref) { protected void refAdded(final MessageReference ref) {
if (ref.isPaged()) { if (ref.isPaged()) {
pagedReferences.incrementAndGet(); pagedReferences.incrementAndGet();
@ -2502,23 +2523,29 @@ public class QueueImpl implements Queue {
} }
private boolean checkExpired(final MessageReference reference) { private boolean checkExpired(final MessageReference reference) {
if (reference.getMessage().isExpired()) { try {
if (isTrace) { if (reference.getMessage().isExpired()) {
ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired"); if (isTrace) {
} ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
reference.handled(); }
reference.handled();
try { try {
expire(reference); expire(reference);
} }
catch (Exception e) { catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringRef(e); ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
} }
return true; return true;
}
else {
return false;
}
} }
else { catch (ActiveMQException e) {
return false; criticalError(e);
throw new IllegalStateException(e);
} }
} }
@ -2557,7 +2584,7 @@ public class QueueImpl implements Queue {
} }
@Override @Override
public void postAcknowledge(final MessageReference ref) { public void postAcknowledge(final MessageReference ref) throws ActiveMQException {
QueueImpl queue = (QueueImpl) ref.getQueue(); QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(); queue.decDelivering();

View File

@ -16,6 +16,13 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -25,12 +32,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class RefsOperation extends TransactionOperationAbstract { public class RefsOperation extends TransactionOperationAbstract {
private final StorageManager storageManager; private final StorageManager storageManager;
@ -55,7 +56,7 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true; ignoreRedeliveryCheck = true;
} }
synchronized void addAck(final MessageReference ref) { synchronized void addAck(final MessageReference ref) throws ActiveMQException {
refsToAck.add(ref); refsToAck.add(ref);
if (ref.isPaged()) { if (ref.isPaged()) {
if (pagedMessagesToPostACK == null) { if (pagedMessagesToPostACK == null) {
@ -147,7 +148,17 @@ public class RefsOperation extends TransactionOperationAbstract {
public void afterCommit(final Transaction tx) { public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) { for (MessageReference ref : refsToAck) {
synchronized (ref.getQueue()) { synchronized (ref.getQueue()) {
queue.postAcknowledge(ref); try {
queue.postAcknowledge(ref);
}
catch (ActiveMQException e) {
if (queue instanceof QueueImpl) {
((QueueImpl) queue).criticalError(e);
}
else {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -97,7 +98,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
} }
@Override @Override
public List<MessageReference> cancel(final Filter filter) { public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
List<MessageReference> refs = new ArrayList<>(); List<MessageReference> refs = new ArrayList<>();
synchronized (scheduledReferences) { synchronized (scheduledReferences) {
@ -115,7 +116,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
} }
@Override @Override
public MessageReference removeReferenceWithID(final long id) { public MessageReference removeReferenceWithID(final long id) throws ActiveMQException {
synchronized (scheduledReferences) { synchronized (scheduledReferences) {
Iterator<RefScheduled> iter = scheduledReferences.iterator(); Iterator<RefScheduled> iter = scheduledReferences.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {

View File

@ -286,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// should go back into the // should go back into the
// queue for delivery later. // queue for delivery later.
// TCP-flow control has to be done first than everything else otherwise we may lose notifications // TCP-flow control has to be done first than everything else otherwise we may lose notifications
if (!callback.isWritable(this) || !started || transferring ) { if (!callback.isWritable(this) || !started || transferring) {
return HandleStatus.BUSY; return HandleStatus.BUSY;
} }
@ -733,25 +733,63 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
@Override public void individualAcknowledge(Transaction tx,
public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception { final long messageID) throws Exception {
if (browseOnly) { if (browseOnly) {
return; return;
} }
MessageReference ref = removeReferenceByID(messageID); boolean startedTransaction = false;
if (ref == null) { if (tx == null) {
ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); startedTransaction = true;
if (tx != null) { tx = new TransactionImpl(storageManager);
tx.markAsRollbackOnly(ils);
}
throw ils;
} }
ackReference(tx, ref); try {
MessageReference ref;
ref = removeReferenceByID(messageID);
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
}
if (ref == null) {
ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
if (tx != null) {
tx.markAsRollbackOnly(ils);
}
throw ils;
}
ackReference(tx, ref);
if (startedTransaction) {
tx.commit();
}
}
catch (ActiveMQException e) {
if (startedTransaction) {
tx.rollback();
}
else {
tx.markAsRollbackOnly(e);
}
throw e;
}
catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
if (startedTransaction) {
tx.rollback();
}
else {
tx.markAsRollbackOnly(hqex);
}
throw hqex;
}
acks++;
} }
@Override @Override

View File

@ -315,6 +315,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return Collections.unmodifiableSet(consumersClone); return Collections.unmodifiableSet(consumersClone);
} }
@Override
public void markTXFailed(Throwable e) {
Transaction currentTX = this.tx;
if (currentTX != null) {
if (e instanceof ActiveMQException) {
currentTX.markAsRollbackOnly((ActiveMQException) e);
}
else {
ActiveMQException exception = new ActiveMQException(e.getMessage());
exception.initCause(e);
currentTX.markAsRollbackOnly(exception);
}
}
}
@Override @Override
public boolean removeConsumer(final long consumerID) throws Exception { public boolean removeConsumer(final long consumerID) throws Exception {
return consumers.remove(consumerID) != null; return consumers.remove(consumerID) != null;

View File

@ -229,7 +229,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
} }
private void validateSequence(ScheduledDeliveryHandlerImpl handler) { private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
long lastSequence = -1; long lastSequence = -1;
for (MessageReference ref : handler.getScheduledReferences()) { for (MessageReference ref : handler.getScheduledReferences()) {
assertEquals(lastSequence + 1, ref.getMessage().getMessageID()); assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
@ -256,7 +256,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
handler.checkAndSchedule(refImpl, tail); handler.checkAndSchedule(refImpl, tail);
} }
private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) { private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) throws Exception {
List<MessageReference> refs = handler.getScheduledReferences(); List<MessageReference> refs = handler.getScheduledReferences();
HashSet<Long> messages = new HashSet<>(); HashSet<Long> messages = new HashSet<>();

View File

@ -207,6 +207,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Override
public void criticalError(Throwable error) {
error.printStackTrace();
}
@Override @Override
public OperationContext newContext(Executor executor) { public OperationContext newContext(Executor executor) {
return null; return null;

View File

@ -1096,6 +1096,14 @@
<exclude>**/node/**</exclude> <exclude>**/node/**</exclude>
<exclude>**/node_modules/**</exclude> <exclude>**/node_modules/**</exclude>
<exclude>**/package.json</exclude> <exclude>**/package.json</exclude>
<!-- things from cmake on the native build -->
<exclude>**/CMakeCache.txt</exclude>
<exclude>**/CMakeFiles/</exclude>
<exclude>**/Makefile</exclude>
<exclude>**/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
</excludes> </excludes>
</configuration> </configuration>
<executions> <executions>

View File

@ -16,10 +16,16 @@
*/ */
package org.apache.activemq.artemis.tests.integration; package org.apache.activemq.artemis.tests.integration;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -37,10 +43,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
public class DuplicateDetectionTest extends ActiveMQTestBase { public class DuplicateDetectionTest extends ActiveMQTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -213,6 +215,75 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size()); Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size());
} }
// It is important to test the shrink with this rule
// because we could have this after crashes
// we would eventually have a higher number of caches while we couldn't have time to clear previous ones
@Test
public void testShrinkCache() throws Exception {
server.stop();
server.getConfiguration().setIDCacheSize(150);
server.start();
final int TEST_SIZE = 200;
ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
session.createQueue(queueName, queueName, null, true);
ClientProducer producer = session.createProducer(queueName);
for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage message = session.createMessage(true);
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
producer.send(message);
}
session.commit();
sf.close();
session.close();
locator.close();
server.stop();
server.getConfiguration().setIDCacheSize(100);
server.start();
locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
sf = createSessionFactory(locator);
session = sf.createSession(false, false, false);
session.start();
producer = session.createProducer(queueName);
// will send the last 50 again
for (int i = TEST_SIZE - 50; i < TEST_SIZE; i++) {
ClientMessage message = session.createMessage(true);
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
producer.send(message);
}
try {
session.commit();
Assert.fail("Exception expected");
}
catch (ActiveMQException expected) {
}
}
@Test @Test
public void testSimpleDuplicateDetectionWithString() throws Exception { public void testSimpleDuplicateDetectionWithString() throws Exception {
ClientSession session = sf.createSession(false, true, true); ClientSession session = sf.createSession(false, true, true);
@ -1239,176 +1310,6 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
} }
} }
@Test
public void testDuplicateCachePersistedRestartWithSmallerCache() throws Exception {
server.stop();
final int initialCacheSize = 10;
final int subsequentCacheSize = 5;
config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
server = createServer(config);
server.start();
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
session.createQueue(queueName, queueName, null, false);
ClientProducer producer = session.createProducer(queueName);
ClientConsumer consumer = session.createConsumer(queueName);
for (int i = 0; i < initialCacheSize; i++) {
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
Assert.assertEquals(i, message2.getObjectProperty(propKey));
}
session.close();
sf.close();
server.stop();
waitForServerToStop(server);
config.setIDCacheSize(subsequentCacheSize);
server = createServer(config);
server.start();
sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
session.createQueue(queueName, queueName, null, false);
producer = session.createProducer(queueName);
consumer = session.createConsumer(queueName);
for (int i = 0; i < initialCacheSize; i++) {
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
if (i >= subsequentCacheSize) {
// Message should get through
ClientMessage message2 = consumer.receive(1000);
Assert.assertEquals(i, message2.getObjectProperty(propKey));
}
else {
ClientMessage message2 = consumer.receiveImmediate();
Assert.assertNull(message2);
}
}
}
@Test
public void testDuplicateCachePersistedRestartWithSmallerCacheEnsureDeleted() throws Exception {
server.stop();
final int initialCacheSize = 10;
final int subsequentCacheSize = 5;
config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
server = createServer(config);
server.start();
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
session.createQueue(queueName, queueName, null, false);
ClientProducer producer = session.createProducer(queueName);
ClientConsumer consumer = session.createConsumer(queueName);
for (int i = 0; i < initialCacheSize; i++) {
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
Assert.assertEquals(i, message2.getObjectProperty(propKey));
}
session.close();
sf.close();
server.stop();
waitForServerToStop(server);
config.setIDCacheSize(subsequentCacheSize);
server = createServer(config);
server.start();
// Now stop and set back to original cache size and restart
server.stop();
waitForServerToStop(server);
config.setIDCacheSize(initialCacheSize);
server = createServer(config);
server.start();
sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
session.createQueue(queueName, queueName, null, false);
producer = session.createProducer(queueName);
consumer = session.createConsumer(queueName);
for (int i = 0; i < initialCacheSize; i++) {
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
if (i >= subsequentCacheSize) {
// Message should get through
ClientMessage message2 = consumer.receive(1000);
Assert.assertEquals(i, message2.getObjectProperty(propKey));
}
else {
ClientMessage message2 = consumer.receiveImmediate();
Assert.assertNull(message2);
}
}
}
@Test @Test
public void testNoPersist() throws Exception { public void testNoPersist() throws Exception {
server.stop(); server.stop();

View File

@ -256,6 +256,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
return null; return null;
} }
@Override
public void criticalException(Throwable e) {
}
@Override @Override
public int getNumberOfPages() { public int getNumberOfPages() {
return 0; return 0;

View File

@ -782,6 +782,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
static final class FakeStoreFactory implements PagingStoreFactory { static final class FakeStoreFactory implements PagingStoreFactory {
@Override
public void criticalException(Throwable e) {
}
final SequentialFileFactory factory; final SequentialFileFactory factory;
public FakeStoreFactory() { public FakeStoreFactory() {

View File

@ -106,7 +106,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
cacheID.addToCache(RandomUtil.randomBytes(), null); cacheID.addToCache(RandomUtil.randomBytes());
} }
journal.stop(); journal.stop();

View File

@ -93,41 +93,47 @@ public class FakeConsumer implements Consumer {
@Override @Override
public synchronized HandleStatus handle(final MessageReference reference) { public synchronized HandleStatus handle(final MessageReference reference) {
if (statusToReturn == HandleStatus.BUSY) { try {
return HandleStatus.BUSY; if (statusToReturn == HandleStatus.BUSY) {
} return HandleStatus.BUSY;
}
if (filter != null) { if (filter != null) {
if (filter.match(reference.getMessage())) { if (filter.match(reference.getMessage())) {
references.addLast(reference); references.addLast(reference);
reference.getQueue().referenceHandled();
notify();
return HandleStatus.HANDLED;
}
else {
return HandleStatus.NO_MATCH;
}
}
if (newStatus != null) {
if (delayCountdown == 0) {
statusToReturn = newStatus;
newStatus = null;
}
else {
delayCountdown--;
}
}
if (statusToReturn == HandleStatus.HANDLED) {
reference.getQueue().referenceHandled(); reference.getQueue().referenceHandled();
references.addLast(reference);
notify(); notify();
}
return HandleStatus.HANDLED; return statusToReturn;
}
else {
return HandleStatus.NO_MATCH;
}
} }
catch (Exception e) {
if (newStatus != null) { e.printStackTrace();
if (delayCountdown == 0) { throw new IllegalStateException(e.getMessage(), e);
statusToReturn = newStatus;
newStatus = null;
}
else {
delayCountdown--;
}
} }
if (statusToReturn == HandleStatus.HANDLED) {
reference.getQueue().referenceHandled();
references.addLast(reference);
notify();
}
return statusToReturn;
} }
@Override @Override

View File

@ -113,8 +113,7 @@ public class FakePostOffice implements PostOffice {
} }
@Override @Override
public Binding removeBinding(final SimpleString uniqueName, final Transaction tx) throws Exception { public Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception {
return null; return null;
} }