This closes #422
This commit is contained in:
commit
ab98eb9620
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
/** This is similar to a Runnable, except that we throw exceptions.
|
||||
* In certain places we need to complete tasks after deliveries,
|
||||
* and this will take care of those situations. */
|
||||
public abstract class PendingTask {
|
||||
|
||||
public abstract void run() throws Exception;
|
||||
|
||||
}
|
|
@ -100,7 +100,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
|
|||
}
|
||||
|
||||
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
|
||||
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getScheduledPool());
|
||||
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
|
||||
|
||||
Executor executor = server.getExecutorFactory().getExecutor();
|
||||
|
||||
|
|
|
@ -115,6 +115,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
(String) null, this, null, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
|
@ -214,38 +219,57 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
|
||||
@Override
|
||||
public void commitCurrentTX() throws Exception {
|
||||
serverSession.commit();
|
||||
recoverContext();
|
||||
try {
|
||||
serverSession.commit();
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackCurrentTX() throws Exception {
|
||||
serverSession.rollback(false);
|
||||
recoverContext();
|
||||
try {
|
||||
serverSession.rollback(false);
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
closeExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
serverSession.close(false);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// TODO Logger
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
recoverContext();
|
||||
try {
|
||||
serverSession.close(false);
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ack(Object brokerConsumer, Object message) throws Exception {
|
||||
((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID());
|
||||
recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID());
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
|
||||
((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
|
||||
recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -267,25 +291,40 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
message.setAddress(new SimpleString(address));
|
||||
}
|
||||
|
||||
serverSession.send(message, false);
|
||||
recoverContext();
|
||||
|
||||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.settle();
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
try {
|
||||
serverSession.send(message, false);
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
synchronized (connection.getLock()) {
|
||||
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
|
||||
connection.flush();
|
||||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.settle();
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
synchronized (connection.getLock()) {
|
||||
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
private void resetContext() {
|
||||
manager.getServer().getStorageManager().setContext(null);
|
||||
}
|
||||
|
||||
private void recoverContext() {
|
||||
manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -76,6 +76,12 @@ public class MQTTSessionCallback implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean hasCredits(ServerConsumer consumerID) {
|
||||
return true;
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -45,8 +46,9 @@ public class AMQServerConsumer extends ServerConsumerImpl {
|
|||
boolean strictUpdateDeliveryCount,
|
||||
ManagementService managementService,
|
||||
boolean supportLargeMessage,
|
||||
Integer credits) throws Exception {
|
||||
super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
|
||||
Integer credits,
|
||||
final ActiveMQServer server) throws Exception {
|
||||
super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
|
||||
}
|
||||
|
||||
public void setBrowserListener(BrowserListener listener) {
|
||||
|
|
|
@ -355,7 +355,7 @@ public class AMQServerSession extends ServerSessionImpl {
|
|||
ManagementService managementService2,
|
||||
boolean supportLargeMessage,
|
||||
Integer credits) throws Exception {
|
||||
return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
|
||||
return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, this.server);
|
||||
}
|
||||
|
||||
public AMQServerConsumer getConsumer(long nativeId) {
|
||||
|
|
|
@ -146,6 +146,11 @@ public class AMQSession implements SessionCallback {
|
|||
started.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
return connection.isWritable(callback);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.proton.plug;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public abstract class AMQPConnectionContextFactory {
|
||||
|
@ -27,10 +28,11 @@ public abstract class AMQPConnectionContextFactory {
|
|||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool);
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool);
|
||||
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.proton.plug.context;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -45,7 +46,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
|
|||
|
||||
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
|
||||
|
||||
protected ProtonHandler handler = ProtonHandler.Factory.create();
|
||||
protected final ProtonHandler handler;
|
||||
|
||||
protected AMQPConnectionCallback connectionCallback;
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
|
@ -54,18 +55,20 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
|
|||
|
||||
protected LocalListener listener = new LocalListener();
|
||||
|
||||
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
|
||||
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool);
|
||||
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
|
||||
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
|
||||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
this.connectionCallback = connectionCallback;
|
||||
this.scheduledPool = scheduledPool;
|
||||
connectionCallback.setConnection(this);
|
||||
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
|
||||
Transport transport = handler.getTransport();
|
||||
if (idleTimeout > 0) {
|
||||
transport.setIdleTimeout(idleTimeout);
|
||||
|
@ -182,7 +185,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
|
|||
if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
long nextKeepAliveTime = handler.tick(true);
|
||||
flushBytes();
|
||||
if (nextKeepAliveTime > 0) {
|
||||
if (nextKeepAliveTime > 0 && scheduledPool != null) {
|
||||
scheduledPool.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -29,20 +29,22 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
|
|||
import org.proton.plug.context.ProtonInitializable;
|
||||
import org.proton.plug.util.FutureRunnable;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext {
|
||||
|
||||
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
|
||||
super(connectionCallback, scheduledPool);
|
||||
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
|
||||
super(connectionCallback, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback,
|
||||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
super(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
|
||||
super(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
// Maybe a client interface?
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.proton.plug.AMQPConnectionContext;
|
|||
import org.proton.plug.AMQPConnectionContextFactory;
|
||||
import org.proton.plug.AMQPConnectionCallback;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory {
|
||||
|
@ -31,8 +32,8 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
|
|||
}
|
||||
|
||||
@Override
|
||||
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
|
||||
return new ProtonClientConnectionContext(connectionCallback, scheduledPool);
|
||||
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
|
||||
return new ProtonClientConnectionContext(connectionCallback, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
|
||||
|
@ -41,7 +42,8 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
|
|||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
|
||||
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,20 +28,22 @@ import org.proton.plug.context.AbstractConnectionContext;
|
|||
import org.proton.plug.context.AbstractProtonSessionContext;
|
||||
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext {
|
||||
|
||||
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, ScheduledExecutorService scheduledPool) {
|
||||
super(connectionSP, scheduledPool);
|
||||
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
|
||||
super(connectionSP, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP,
|
||||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
super(connectionSP, idleTimeout, maxFrameSize, channelMax, scheduledPool);
|
||||
super(connectionSP, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.proton.plug.AMQPConnectionContextFactory;
|
|||
import org.proton.plug.AMQPConnectionCallback;
|
||||
import org.proton.plug.AMQPServerConnectionContext;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
|
||||
|
@ -35,8 +36,8 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
|
|||
}
|
||||
|
||||
@Override
|
||||
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
|
||||
return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool);
|
||||
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
|
||||
return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -44,7 +45,8 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
|
|||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
|
||||
return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.proton.plug.handler;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
import org.apache.qpid.proton.engine.Transport;
|
||||
|
@ -31,10 +33,9 @@ public interface ProtonHandler {
|
|||
|
||||
long tick(boolean firstTick);
|
||||
|
||||
public static final class Factory {
|
||||
|
||||
public static ProtonHandler create() {
|
||||
return new ProtonHandlerImpl();
|
||||
final class Factory {
|
||||
public static ProtonHandler create(Executor dispatchExecutor) {
|
||||
return new ProtonHandlerImpl(dispatchExecutor);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -54,6 +55,14 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
|
|||
|
||||
private final Collector collector = Proton.collector();
|
||||
|
||||
private final Executor dispatchExecutor;
|
||||
|
||||
private final Runnable dispatchRunnable = new Runnable() {
|
||||
public void run() {
|
||||
dispatch();
|
||||
}
|
||||
};
|
||||
|
||||
private ArrayList<EventHandler> handlers = new ArrayList<>();
|
||||
|
||||
private Sasl serverSasl;
|
||||
|
@ -68,18 +77,14 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
|
|||
|
||||
private SASLResult saslResult;
|
||||
|
||||
/**
|
||||
* If dispatching a dispatch call is ignored to avoid infinite stack loop
|
||||
*/
|
||||
private boolean dispatching = false;
|
||||
|
||||
protected volatile boolean dataReceived;
|
||||
|
||||
protected boolean receivedFirstPacket = false;
|
||||
|
||||
private int offset = 0;
|
||||
|
||||
public ProtonHandlerImpl() {
|
||||
public ProtonHandlerImpl(Executor dispatchExecutor) {
|
||||
this.dispatchExecutor = dispatchExecutor;
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
transport.bind(connection);
|
||||
connection.collect(collector);
|
||||
|
@ -271,20 +276,9 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
|
|||
|
||||
checkServerSASL();
|
||||
|
||||
if (dispatching) {
|
||||
return;
|
||||
}
|
||||
|
||||
dispatching = true;
|
||||
|
||||
}
|
||||
|
||||
try {
|
||||
dispatch();
|
||||
}
|
||||
finally {
|
||||
dispatching = false;
|
||||
}
|
||||
dispatchExecutor.execute(dispatchRunnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.proton.plug.context;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
|
@ -48,7 +50,7 @@ public class AbstractConnectionContextTest {
|
|||
private class TestConnectionContext extends AbstractConnectionContext {
|
||||
|
||||
public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
|
||||
super(connectionCallback, null);
|
||||
super(connectionCallback, Executors.newSingleThreadExecutor(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.proton.plug.test.invm;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.proton.plug.AMQPClientConnectionContext;
|
||||
import org.proton.plug.context.client.ProtonClientConnectionContext;
|
||||
import org.proton.plug.test.minimalclient.Connector;
|
||||
|
@ -32,6 +34,6 @@ public class InVMTestConnector implements Connector {
|
|||
|
||||
@Override
|
||||
public AMQPClientConnectionContext connect(String host, int port) throws Exception {
|
||||
return new ProtonClientConnectionContext(new ProtonINVMSPI(), null);
|
||||
return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(), null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
|||
|
||||
AMQPConnectionContext returningConnection;
|
||||
|
||||
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), null);
|
||||
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(), null);
|
||||
|
||||
final ExecutorService mainExecutor = Executors.newSingleThreadExecutor();
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.proton.plug.test.minimalclient;
|
|||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -59,7 +60,7 @@ public class SimpleAMQPConnector implements Connector {
|
|||
|
||||
AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
|
||||
|
||||
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, null);
|
||||
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(), null);
|
||||
|
||||
future.channel().pipeline().addLast(new ChannelDuplexHandler() {
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.proton.plug.test.minimalserver;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -124,7 +125,7 @@ public class MinimalServer {
|
|||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelActive(ctx);
|
||||
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), null);
|
||||
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(), null);
|
||||
//ctx.read();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.zip.Inflater;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
@ -43,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
|||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.PendingTask;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
||||
|
@ -57,6 +60,8 @@ public class StompSession implements SessionCallback {
|
|||
|
||||
private final OperationContext sessionContext;
|
||||
|
||||
private final BlockingDeque<PendingTask> afterDeliveryTasks = new LinkedBlockingDeque<>();
|
||||
|
||||
private final Map<Long, StompSubscription> subscriptions = new ConcurrentHashMap<>();
|
||||
|
||||
// key = message ID, value = consumer ID
|
||||
|
@ -100,7 +105,15 @@ public class StompSession implements SessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int sendMessage(ServerMessage serverMessage, ServerConsumer consumer, int deliveryCount) {
|
||||
public void afterDelivery() throws Exception {
|
||||
PendingTask task;
|
||||
while ((task = afterDeliveryTasks.poll()) != null) {
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
|
||||
LargeServerMessageImpl largeMessage = null;
|
||||
ServerMessage newServerMessage = serverMessage;
|
||||
try {
|
||||
|
@ -144,9 +157,20 @@ public class StompSession implements SessionCallback {
|
|||
|
||||
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
|
||||
if (manager.send(connection, frame)) {
|
||||
//we ack and commit only if the send is successful
|
||||
session.acknowledge(consumer.getID(), newServerMessage.getMessageID());
|
||||
session.commit();
|
||||
final long messageID = newServerMessage.getMessageID();
|
||||
final long consumerID = consumer.getID();
|
||||
|
||||
// this will be called after the delivery is complete
|
||||
// we can't call sesison.ack within the delivery
|
||||
// as it could dead lock.
|
||||
afterDeliveryTasks.offer(new PendingTask() {
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
//we ack and commit only if the send is successful
|
||||
session.acknowledge(consumerID, messageID);
|
||||
session.commit();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -98,6 +98,12 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
channel.send(packet);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
||||
Packet packet = new SessionProducerCreditsFailMessage(credits, address);
|
||||
|
|
|
@ -756,9 +756,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void broadcastGroupClosed(@Cause Exception e);
|
||||
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222109, value = "NodeID={0} is not available on the topology. Retrying the connection to that node now", format = Message.Format.MESSAGE_FORMAT)
|
||||
void nodeNotAvailable(String targetNodeID);
|
||||
@Message(id = 222109, value = "Timed out waiting for write lock on consumer. Check the Thread dump", format = Message.Format.MESSAGE_FORMAT)
|
||||
void timeoutLockingConsumer();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222110, value = "no queue IDs defined!, originalMessage = {0}, copiedMessage = {1}, props={2}",
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
|
@ -86,6 +88,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
private Object protocolContext;
|
||||
|
||||
private final ActiveMQServer server;
|
||||
|
||||
/**
|
||||
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
|
||||
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
|
||||
|
@ -148,8 +152,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
final SessionCallback callback,
|
||||
final boolean preAcknowledge,
|
||||
final boolean strictUpdateDeliveryCount,
|
||||
final ManagementService managementService) throws Exception {
|
||||
this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null);
|
||||
final ManagementService managementService,
|
||||
final ActiveMQServer server) throws Exception {
|
||||
this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server);
|
||||
}
|
||||
|
||||
public ServerConsumerImpl(final long id,
|
||||
|
@ -164,7 +169,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
final boolean strictUpdateDeliveryCount,
|
||||
final ManagementService managementService,
|
||||
final boolean supportLargeMessage,
|
||||
final Integer credits) throws Exception {
|
||||
final Integer credits,
|
||||
final ActiveMQServer server) throws Exception {
|
||||
this.id = id;
|
||||
|
||||
this.filter = filter;
|
||||
|
@ -209,6 +215,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
availableCredits.set(credits);
|
||||
}
|
||||
}
|
||||
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -378,7 +386,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
}
|
||||
finally {
|
||||
lockDelivery.readLock().unlock();
|
||||
callback.afterDelivery();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -559,12 +569,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
@Override
|
||||
public void setStarted(final boolean started) {
|
||||
synchronized (lock) {
|
||||
lockDelivery.writeLock().lock();
|
||||
boolean locked = lockDelivery();
|
||||
|
||||
// This is to make sure nothing would sneak to the client while started = false
|
||||
// the client will stop the session and perform a rollback in certain cases.
|
||||
// in case something sneaks to the client you could get to messaging delivering forever until
|
||||
// you restart the server
|
||||
try {
|
||||
this.started = browseOnly || started;
|
||||
}
|
||||
finally {
|
||||
lockDelivery.writeLock().unlock();
|
||||
if (locked) {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -574,22 +591,39 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean lockDelivery() {
|
||||
try {
|
||||
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
|
||||
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
|
||||
if (server != null) {
|
||||
server.threadDump();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransferring(final boolean transferring) {
|
||||
synchronized (lock) {
|
||||
this.transferring = transferring;
|
||||
// This is to make sure that the delivery process has finished any pending delivery
|
||||
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||
boolean locked = lockDelivery();
|
||||
try {
|
||||
this.transferring = transferring;
|
||||
}
|
||||
finally {
|
||||
if (locked) {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is to make sure that the delivery process has finished any pending delivery
|
||||
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||
try {
|
||||
lockDelivery.writeLock().lock();
|
||||
}
|
||||
finally {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
|
||||
|
||||
// Outside the lock
|
||||
if (transferring) {
|
||||
// And we must wait for any force delivery to be executed - this is executed async so we add a future to the
|
||||
|
|
|
@ -479,7 +479,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
ManagementService managementService2,
|
||||
boolean supportLargeMessage,
|
||||
Integer credits) throws Exception {
|
||||
return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
|
||||
return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,6 +28,10 @@ public interface SessionCallback {
|
|||
*/
|
||||
boolean hasCredits(ServerConsumer consumerID);
|
||||
|
||||
/** This can be used to complete certain operations outside of the lock,
|
||||
* like acks or other operations. */
|
||||
void afterDelivery() throws Exception;
|
||||
|
||||
void sendProducerCreditsMessage(int credits, SimpleString address);
|
||||
|
||||
void sendProducerCreditsFailMessage(int credits, SimpleString address);
|
||||
|
|
|
@ -488,6 +488,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
||||
targetCallback.sendProducerCreditsFailMessage(credits, address);
|
||||
|
|
|
@ -136,11 +136,6 @@ public class ProtonTest extends ActiveMQTestBase {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
for (long timeout = System.currentTimeMillis() + 1000; timeout > System.currentTimeMillis() && server.getRemotingService().getConnections().size() != 0; ) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
Assert.assertEquals("The remoting connection wasn't removed after connection.close()", 0, server.getRemotingService().getConnections().size());
|
||||
server.stop();
|
||||
}
|
||||
finally {
|
||||
|
@ -633,6 +628,7 @@ public class ProtonTest extends ActiveMQTestBase {
|
|||
for (int i = 0; i < numMessages; i++) {
|
||||
System.out.println("Sending " + i);
|
||||
TextMessage message = session.createTextMessage("text" + i);
|
||||
message.setStringProperty("text", "text" + i);
|
||||
p.send(message);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue