This closes #1207
This commit is contained in:
commit
1f82c783a7
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
public interface RunnableEx {
|
||||
void run() throws Exception;
|
||||
}
|
|
@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.io.aio;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -100,16 +102,34 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
super.close();
|
||||
|
||||
if (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
|
||||
final String fileName = this.getFileName();
|
||||
try {
|
||||
int waitCount = 0;
|
||||
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
|
||||
waitCount++;
|
||||
if (waitCount == 1) {
|
||||
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
|
||||
for (ThreadInfo threadInfo : threads) {
|
||||
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
|
||||
}
|
||||
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
|
||||
}
|
||||
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
|
||||
throw e;
|
||||
} finally {
|
||||
|
||||
opened = false;
|
||||
|
||||
timedBuffer = null;
|
||||
|
||||
aioFile.close();
|
||||
|
||||
aioFile = null;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -90,6 +90,7 @@ public class JournalFilesRepository {
|
|||
pushOpenedFile();
|
||||
} catch (Exception e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
|
||||
fileFactory.onIOError(e, "unable to open ", null);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -412,21 +413,35 @@ public class JournalFilesRepository {
|
|||
logger.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
|
||||
}
|
||||
|
||||
if (openFilesExecutor == null) {
|
||||
pushOpenRunnable.run();
|
||||
} else {
|
||||
openFilesExecutor.execute(pushOpenRunnable);
|
||||
// First try to get an open file, that's prepared and already open
|
||||
JournalFile nextFile = openedFiles.poll();
|
||||
|
||||
if (nextFile == null) {
|
||||
// if there's none, push to open
|
||||
|
||||
pushOpen();
|
||||
|
||||
nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
if (openedFiles.isEmpty()) {
|
||||
// if empty, push to open one.
|
||||
pushOpen();
|
||||
}
|
||||
|
||||
JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
||||
if (nextFile == null) {
|
||||
fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
|
||||
|
||||
logger.debug("Could not get a file in 5 seconds, it will retry directly, without an executor");
|
||||
try {
|
||||
nextFile = takeFile(true, true, true, false);
|
||||
} catch (Exception e) {
|
||||
fileFactory.onIOError(e, "unable to open ", null);
|
||||
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
|
||||
// If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
|
||||
fileFactory.activateBuffer(journal.getCurrentFile().getFile());
|
||||
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
|
||||
}
|
||||
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Returning file " + nextFile);
|
||||
}
|
||||
|
@ -434,6 +449,14 @@ public class JournalFilesRepository {
|
|||
return nextFile;
|
||||
}
|
||||
|
||||
private void pushOpen() {
|
||||
if (openFilesExecutor == null) {
|
||||
pushOpenRunnable.run();
|
||||
} else {
|
||||
openFilesExecutor.execute(pushOpenRunnable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a file and place it into the openedFiles queue
|
||||
*/
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
|
|||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
|
@ -156,6 +157,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
connection.write(new ChannelBufferWrapper(byteBuf, true));
|
||||
}
|
||||
|
||||
public boolean isWritable(ReadyListener readyListener) {
|
||||
return connection.isWritable(readyListener);
|
||||
}
|
||||
|
||||
|
||||
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
|
||||
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
|
||||
}
|
||||
|
|
|
@ -633,7 +633,8 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
private synchronized void checkBuffer() {
|
||||
if (!bufferValid) {
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
|
||||
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
|
||||
try {
|
||||
getProtonMessage().encode(new NettyWritable(buffer));
|
||||
byte[] bytes = new byte[buffer.writerIndex()];
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
|||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||
import org.apache.activemq.artemis.utils.RunnableEx;
|
||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
@ -78,6 +80,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private final ProtonProtocolManager manager;
|
||||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
private final AMQPConnectionContext connection;
|
||||
|
||||
private final Connection transportConnection;
|
||||
|
@ -100,6 +104,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
OperationContext operationContext) {
|
||||
this.protonSPI = protonSPI;
|
||||
this.manager = manager;
|
||||
this.storageManager = manager.getServer().getStorageManager();
|
||||
this.connection = connection;
|
||||
this.transportConnection = transportConnection;
|
||||
this.closeExecutor = executor;
|
||||
|
@ -134,6 +139,24 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
public void withinContext(RunnableEx run) throws Exception {
|
||||
OperationContext context = recoverContext();
|
||||
try {
|
||||
run.run();
|
||||
} finally {
|
||||
resetContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIO(IOCallback ioCallback) {
|
||||
OperationContext context = recoverContext();
|
||||
try {
|
||||
manager.getServer().getStorageManager().afterCompleteOperations(ioCallback);
|
||||
} finally {
|
||||
resetContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void browserFinished(ServerConsumer consumer) {
|
||||
|
||||
|
@ -315,11 +338,11 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
public void close() throws Exception {
|
||||
//need to check here as this can be called if init fails
|
||||
if (serverSession != null) {
|
||||
recoverContext();
|
||||
OperationContext context = recoverContext();
|
||||
try {
|
||||
serverSession.close(false);
|
||||
} finally {
|
||||
resetContext();
|
||||
resetContext(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -328,30 +351,30 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
if (transaction == null) {
|
||||
transaction = serverSession.getCurrentTransaction();
|
||||
}
|
||||
recoverContext();
|
||||
OperationContext oldContext = recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
|
||||
} finally {
|
||||
resetContext();
|
||||
resetContext(oldContext);
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
|
||||
recoverContext();
|
||||
OperationContext oldContext = recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
|
||||
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
|
||||
} finally {
|
||||
resetContext();
|
||||
resetContext(oldContext);
|
||||
}
|
||||
}
|
||||
|
||||
public void reject(Object brokerConsumer, Message message) throws Exception {
|
||||
recoverContext();
|
||||
OperationContext oldContext = recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).reject(message.getMessageID());
|
||||
} finally {
|
||||
resetContext();
|
||||
resetContext(oldContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -380,8 +403,9 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
recoverContext();
|
||||
OperationContext oldcontext = recoverContext();
|
||||
|
||||
try {
|
||||
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
|
||||
if (store.isRejectingMessages()) {
|
||||
// We drop pre-settled messages (and abort any associated Tx)
|
||||
|
@ -397,6 +421,9 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
} else {
|
||||
serverSend(transaction, message, delivery, receiver);
|
||||
}
|
||||
} finally {
|
||||
resetContext(oldcontext);
|
||||
}
|
||||
}
|
||||
|
||||
private void rejectMessage(Delivery delivery, Symbol errorCondition, String errorMessage) {
|
||||
|
@ -406,6 +433,9 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
Rejected rejected = new Rejected();
|
||||
rejected.setError(condition);
|
||||
|
||||
afterIO(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(rejected);
|
||||
|
@ -416,17 +446,23 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
connection.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private void serverSend(final Transaction transaction,
|
||||
final Message message,
|
||||
final Delivery delivery,
|
||||
final Receiver receiver) throws Exception {
|
||||
try {
|
||||
|
||||
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
|
||||
|
||||
serverSession.send(transaction, message, false, false);
|
||||
|
||||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
afterIO(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
connection.lock();
|
||||
|
@ -458,9 +494,6 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
}
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
public void offerProducerCredit(final String address,
|
||||
|
@ -502,12 +535,15 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
manager.getServer().destroyQueue(new SimpleString(queueName));
|
||||
}
|
||||
|
||||
private void resetContext() {
|
||||
manager.getServer().getStorageManager().setContext(null);
|
||||
public void resetContext(OperationContext oldContext) {
|
||||
storageManager.setContext(oldContext);
|
||||
}
|
||||
|
||||
private void recoverContext() {
|
||||
public OperationContext recoverContext() {
|
||||
|
||||
OperationContext oldContext = storageManager.getContext();
|
||||
manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
|
||||
return oldContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
|
|||
|
||||
Executor executor = server.getExecutorFactory().getExecutor();
|
||||
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
|
||||
eventHandler.ifPresent(amqpConnection::addEventHandler);
|
||||
|
||||
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
|
|||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
|
@ -90,7 +91,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
this.scheduledPool = scheduledPool;
|
||||
connectionCallback.setConnection(this);
|
||||
this.handler = new ProtonHandler();
|
||||
this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
|
||||
handler.addEventHandler(this);
|
||||
Transport transport = handler.getTransport();
|
||||
transport.setEmitFlowEventOnSend(false);
|
||||
|
@ -332,6 +333,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
connectionCallback.onTransport(bytes, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flowControl(ReadyListener readyListener) {
|
||||
return connectionCallback.isWritable(readyListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Connection connection) throws Exception {
|
||||
lock();
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
|||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
|
@ -486,6 +488,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return;
|
||||
}
|
||||
|
||||
OperationContext oldContext = sessionSPI.recoverContext();
|
||||
|
||||
try {
|
||||
Message message = ((MessageReference) delivery.getContext()).getMessage();
|
||||
|
||||
|
@ -590,8 +594,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
// todo not sure if we need to do anything here
|
||||
}
|
||||
} finally {
|
||||
sessionSPI.afterIO(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
connection.flush();
|
||||
}
|
||||
});
|
||||
|
||||
sessionSPI.resetContext(oldContext);
|
||||
}
|
||||
}
|
||||
|
||||
public void settle(Delivery delivery) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.Link;
|
||||
|
@ -78,4 +79,6 @@ public interface EventHandler {
|
|||
|
||||
void pushBytes(ByteBuf bytes);
|
||||
|
||||
boolean flowControl(ReadyListener readyListener);
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator;
|
|||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.qpid.proton.Proton;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
|
@ -71,14 +73,23 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
protected boolean receivedFirstPacket = false;
|
||||
|
||||
private final Executor flushExecutor;
|
||||
|
||||
protected final ReadyListener readyListener;
|
||||
|
||||
boolean inDispatch = false;
|
||||
|
||||
public ProtonHandler() {
|
||||
public ProtonHandler(Executor flushExecutor) {
|
||||
this.flushExecutor = flushExecutor;
|
||||
this.readyListener = () -> flushExecutor.execute(() -> {
|
||||
flush();
|
||||
});
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
transport.bind(connection);
|
||||
connection.collect(collector);
|
||||
}
|
||||
|
||||
|
||||
public long tick(boolean firstTick) {
|
||||
lock.lock();
|
||||
try {
|
||||
|
@ -161,6 +172,13 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
}
|
||||
|
||||
public void flushBytes() {
|
||||
|
||||
for (EventHandler handler : handlers) {
|
||||
if (!handler.flowControl(readyListener)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
while (true) {
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||
|
@ -118,24 +119,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
|
||||
tx.discharge();
|
||||
|
||||
IOCallback ioAction = new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(new Accepted());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
if (discharge.getFail()) {
|
||||
tx.rollback();
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(new Accepted());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
sessionSPI.withinContext(() -> tx.rollback());
|
||||
sessionSPI.afterIO(ioAction);
|
||||
} else {
|
||||
tx.commit();
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(new Accepted());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
sessionSPI.withinContext(() -> tx.commit());
|
||||
sessionSPI.afterIO(ioAction);
|
||||
}
|
||||
}
|
||||
} catch (ActiveMQAMQPException amqpE) {
|
||||
|
@ -157,6 +163,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
}
|
||||
connection.flush();
|
||||
} finally {
|
||||
sessionSPI.afterIO(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.settle();
|
||||
|
@ -165,6 +174,13 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -683,7 +683,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
if (pagingManager.isDiskFull()) {
|
||||
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
|
||||
}
|
||||
blocking.set(true);
|
||||
}
|
||||
|
|
|
@ -944,7 +944,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void errorExpiringReferencesNoBindings(SimpleString expiryAddress);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222147, value = "Message has expired. No expiry queue configured for queue {0} so dropping it", format = Message.Format.MESSAGE_FORMAT)
|
||||
@Message(id = 222147, value = "Messages are being expired on queue{0}. However there is no expiry queue configured, hence messages will be dropped.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void errorExpiringReferencesNoQueue(SimpleString name);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
|
@ -1104,8 +1104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void missingClusterConfigForScaleDown(String scaleDownCluster);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
|
||||
@Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222184,
|
||||
|
|
|
@ -146,6 +146,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
private final LinkedListIterator<PagedReference> pageIterator;
|
||||
|
||||
private volatile boolean printErrorExpiring = false;
|
||||
|
||||
// Messages will first enter intermediateMessageReferences
|
||||
// Before they are added to messageReferences
|
||||
// This is to avoid locking the queue on the producer
|
||||
|
@ -1567,27 +1569,52 @@ public class QueueImpl implements Queue {
|
|||
if (queueDestroyed) {
|
||||
return;
|
||||
}
|
||||
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
|
||||
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
boolean expired = false;
|
||||
boolean hasElements = false;
|
||||
|
||||
int elementsExpired = 0;
|
||||
try {
|
||||
Transaction tx = null;
|
||||
|
||||
while (postOffice.isStarted() && iter.hasNext()) {
|
||||
hasElements = true;
|
||||
MessageReference ref = iter.next();
|
||||
try {
|
||||
if (ref.getMessage().isExpired()) {
|
||||
if (tx == null) {
|
||||
tx = new TransactionImpl(storageManager);
|
||||
}
|
||||
incDelivering();
|
||||
expired = true;
|
||||
expire(ref);
|
||||
expire(tx, ref);
|
||||
iter.remove();
|
||||
refRemoved(ref);
|
||||
|
||||
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
|
||||
logger.debug("Breaking loop of expiring");
|
||||
scannerRunning.incrementAndGet();
|
||||
getExecutor().execute(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Expired " + elementsExpired + " references");
|
||||
|
||||
try {
|
||||
if (tx != null) {
|
||||
tx.commit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
||||
// If empty we need to schedule depaging to make sure we would depage expired messages as well
|
||||
|
@ -1600,6 +1627,8 @@ public class QueueImpl implements Queue {
|
|||
} catch (Throwable ignored) {
|
||||
}
|
||||
scannerRunning.decrementAndGet();
|
||||
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1912,7 +1941,6 @@ public class QueueImpl implements Queue {
|
|||
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
|
||||
}
|
||||
|
||||
|
||||
private synchronized void internalAddTail(final MessageReference ref) {
|
||||
refAdded(ref);
|
||||
messageReferences.addTail(ref, getPriority(ref));
|
||||
|
@ -2519,7 +2547,11 @@ public class QueueImpl implements Queue {
|
|||
move(expiryAddress, tx, ref, true, true);
|
||||
}
|
||||
} else {
|
||||
if (!printErrorExpiring) {
|
||||
printErrorExpiring = true;
|
||||
// print this only once
|
||||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
|
||||
}
|
||||
|
||||
acknowledge(tx, ref);
|
||||
}
|
||||
|
|
|
@ -172,6 +172,23 @@
|
|||
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-expire</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- this makes it easier in certain envs -->
|
||||
<configuration>${basedir}/target/classes/servers/expire</configuration>
|
||||
<javaOptions>-Dartemis.debug.paging.interval=1</javaOptions>
|
||||
<allowAnonymous>true</allowAnonymous>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<instance>${basedir}/target/expire</instance>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<message-expiry-scan-period>1000</message-expiry-scan-period>
|
||||
|
||||
<!--
|
||||
You can specify the NIC you want to use to verify if the network
|
||||
<network-check-NIC>theNickName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit.
|
||||
This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size>
|
||||
|
||||
-->
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="guest"/>
|
||||
<permission type="deleteNonDurableQueue" roles="guest"/>
|
||||
<permission type="createDurableQueue" roles="guest"/>
|
||||
<permission type="deleteDurableQueue" roles="guest"/>
|
||||
<permission type="createAddress" roles="guest"/>
|
||||
<permission type="deleteAddress" roles="guest"/>
|
||||
<permission type="consume" roles="guest"/>
|
||||
<permission type="browse" roles="guest"/>
|
||||
<permission type="send" roles="guest"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="guest"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.smoke.expire;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSimpleExpire extends SmokeTestBase {
|
||||
|
||||
public static final String SERVER_NAME_0 = "expire";
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
cleanupData(SERVER_NAME_0);
|
||||
disableCheckThread();
|
||||
startServer(SERVER_NAME_0, 0, 30000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendExpire() throws Exception {
|
||||
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
Queue queue = session.createQueue("q0");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
producer.setTimeToLive(1000);
|
||||
for (int i = 0; i < 20000; i++) {
|
||||
producer.send(session.createTextMessage("expired"));
|
||||
if (i % 5000 == 0) {
|
||||
session.commit();
|
||||
System.out.println("Sent " + i + " + messages");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
Thread.sleep(5000);
|
||||
producer.setTimeToLive(0);
|
||||
for (int i = 0; i < 500; i++) {
|
||||
producer.send(session.createTextMessage("ok"));
|
||||
|
||||
}
|
||||
session.commit();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
TextMessage txt = (TextMessage) consumer.receive(10000);
|
||||
Assert.assertNotNull(txt);
|
||||
Assert.assertEquals("ok", txt.getText());
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue