This closes #522
This commit is contained in:
commit
d728fe7718
|
@ -31,7 +31,6 @@ import org.jgroups.JChannel;
|
|||
public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class);
|
||||
private static final boolean isTrace = logger.isTraceEnabled();
|
||||
|
||||
private final JChannel channel;
|
||||
|
||||
|
@ -47,14 +46,14 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
|
|||
// private static JChannelManager recoverManager(JChannel channel) {
|
||||
// JChannelManager manager = managers.get(channel);
|
||||
// if (manager == null) {
|
||||
// if (isTrace) {
|
||||
// if (logger.isTraceEnabled()) {
|
||||
// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
|
||||
// }
|
||||
// manager = new JChannelManager();
|
||||
// managers.put(channel, manager);
|
||||
// }
|
||||
// else {
|
||||
// if (isTrace) {
|
||||
// if (logger.isTraceEnabled()) {
|
||||
// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
|
||||
// }
|
||||
//
|
||||
|
@ -69,7 +68,7 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
|
|||
}
|
||||
|
||||
private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) {
|
||||
if (isTrace) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace"));
|
||||
}
|
||||
this.manager = manager;
|
||||
|
|
|
@ -30,7 +30,6 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
|||
|
||||
private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);
|
||||
|
||||
private static final boolean isTrace = logger.isTraceEnabled();
|
||||
private final String channelName;
|
||||
|
||||
private boolean clientOpened;
|
||||
|
@ -50,7 +49,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
|||
|
||||
@Override
|
||||
public void broadcast(final byte[] data) throws Exception {
|
||||
if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (logger.isTraceEnabled()) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (broadcastOpened) {
|
||||
org.jgroups.Message msg = new org.jgroups.Message();
|
||||
|
||||
|
@ -62,7 +61,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
|||
|
||||
@Override
|
||||
public byte[] receiveBroadcast() throws Exception {
|
||||
if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (clientOpened) {
|
||||
return receiver.receiveBroadcast();
|
||||
}
|
||||
|
@ -73,7 +72,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
|||
|
||||
@Override
|
||||
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
|
||||
if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (clientOpened) {
|
||||
return receiver.receiveBroadcast(time, unit);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.jboss.logging.Logger;
|
|||
public class JChannelManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JChannelManager.class);
|
||||
private static final boolean isTrace = logger.isTraceEnabled();
|
||||
|
||||
private Map<String, JChannelWrapper> channels;
|
||||
|
||||
|
@ -46,11 +45,11 @@ public class JChannelManager {
|
|||
if (wrapper == null) {
|
||||
wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
|
||||
channels.put(channelName, wrapper);
|
||||
if (isTrace)
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace("Put Channel " + channelName);
|
||||
return wrapper;
|
||||
}
|
||||
if (isTrace)
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace("Add Ref Count " + channelName);
|
||||
return wrapper.addRef();
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.jgroups.ReceiverAdapter;
|
|||
*/
|
||||
public class JChannelWrapper {
|
||||
private static final Logger logger = Logger.getLogger(JChannelWrapper.class);
|
||||
private static final boolean isTrace = logger.isTraceEnabled();
|
||||
|
||||
private boolean connected = false;
|
||||
int refCount = 1;
|
||||
|
@ -47,7 +46,7 @@ public class JChannelWrapper {
|
|||
this.manager = manager;
|
||||
|
||||
|
||||
if (isTrace && channel.getReceiver() != null) {
|
||||
if (logger.isTraceEnabled() && channel.getReceiver() != null) {
|
||||
logger.trace(this + "The channel already had a receiver previously!!!! == " + channel.getReceiver(), new Exception("trace"));
|
||||
}
|
||||
|
||||
|
@ -61,7 +60,7 @@ public class JChannelWrapper {
|
|||
|
||||
@Override
|
||||
public void receive(org.jgroups.Message msg) {
|
||||
if (isTrace) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName);
|
||||
}
|
||||
synchronized (receivers) {
|
||||
|
@ -83,7 +82,7 @@ public class JChannelWrapper {
|
|||
|
||||
public synchronized void close(boolean closeWrappedChannel) {
|
||||
refCount--;
|
||||
if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
|
||||
if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
|
||||
if (refCount == 0) {
|
||||
if (closeWrappedChannel) {
|
||||
connected = false;
|
||||
|
@ -96,14 +95,14 @@ public class JChannelWrapper {
|
|||
}
|
||||
|
||||
public void removeReceiver(JGroupsReceiver receiver) {
|
||||
if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
|
||||
if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
|
||||
synchronized (receivers) {
|
||||
receivers.remove(receiver);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void connect() throws Exception {
|
||||
if (isTrace) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: Connecting " + channelName, new Exception("Trace"));
|
||||
}
|
||||
|
||||
|
@ -121,19 +120,19 @@ public class JChannelWrapper {
|
|||
|
||||
public void addReceiver(JGroupsReceiver jGroupsReceiver) {
|
||||
synchronized (receivers) {
|
||||
if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
|
||||
if (logger.isTraceEnabled()) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
|
||||
receivers.add(jGroupsReceiver);
|
||||
}
|
||||
}
|
||||
|
||||
public void send(org.jgroups.Message msg) throws Exception {
|
||||
if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
|
||||
if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
|
||||
channel.send(msg);
|
||||
}
|
||||
|
||||
public JChannelWrapper addRef() {
|
||||
this.refCount++;
|
||||
if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
|
||||
if (logger.isTraceEnabled()) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,19 +31,18 @@ import org.jgroups.ReceiverAdapter;
|
|||
public class JGroupsReceiver extends ReceiverAdapter {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
|
||||
private static final boolean isTrace = logger.isTraceEnabled();
|
||||
|
||||
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
|
||||
|
||||
@Override
|
||||
public void receive(org.jgroups.Message msg) {
|
||||
if (isTrace) logger.trace("sending message " + msg);
|
||||
if (logger.isTraceEnabled()) logger.trace("sending message " + msg);
|
||||
dequeue.add(msg.getBuffer());
|
||||
}
|
||||
|
||||
public byte[] receiveBroadcast() throws Exception {
|
||||
byte[] bytes = dequeue.take();
|
||||
if (isTrace) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logBytes("receiveBroadcast()", bytes);
|
||||
}
|
||||
|
||||
|
@ -62,7 +61,7 @@ public class JGroupsReceiver extends ReceiverAdapter {
|
|||
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
|
||||
byte[] bytes = dequeue.poll(time, unit);
|
||||
|
||||
if (isTrace) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
|
||||
}
|
||||
|
||||
|
|
|
@ -43,12 +43,13 @@ import org.apache.activemq.artemis.utils.PriorityLinkedList;
|
|||
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||
// Constants
|
||||
// ------------------------------------------------------------------------------------
|
||||
|
||||
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
private static final Logger logger = Logger.getLogger(ClientConsumerImpl.class);
|
||||
|
||||
private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
|
||||
|
||||
|
@ -170,6 +171,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
this.contextClassLoader = contextClassLoader;
|
||||
|
||||
this.flowControlExecutor = flowControlExecutor;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: being created at", new Exception("trace"));
|
||||
}
|
||||
}
|
||||
|
||||
// ClientConsumer implementation
|
||||
|
@ -181,9 +186,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
}
|
||||
|
||||
private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ")");
|
||||
}
|
||||
|
||||
checkClosed();
|
||||
|
||||
if (largeMessageReceived != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> discard LargeMessage body for " + largeMessageReceived);
|
||||
}
|
||||
// Check if there are pending packets to be received
|
||||
largeMessageReceived.discardBody();
|
||||
largeMessageReceived = null;
|
||||
|
@ -194,10 +206,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
}
|
||||
|
||||
if (handler != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> throwing messageHandlerSet");
|
||||
}
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
|
||||
}
|
||||
|
||||
if (clientWindowSize == 0) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> start slowConsumer");
|
||||
}
|
||||
startSlowConsumer();
|
||||
}
|
||||
|
||||
|
@ -234,6 +252,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
}
|
||||
}
|
||||
|
||||
if ( m != null) {
|
||||
session.workDone();
|
||||
}
|
||||
|
||||
try {
|
||||
wait(toWait);
|
||||
}
|
||||
|
@ -255,6 +277,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
|
||||
if (failedOver) {
|
||||
if (m == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> m == null and failover");
|
||||
}
|
||||
|
||||
// if failed over and the buffer is null, we reset the state and try it again
|
||||
failedOver = false;
|
||||
deliveryForced = false;
|
||||
|
@ -262,13 +288,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
continue;
|
||||
}
|
||||
else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> failedOver, but m != null, being " + m);
|
||||
}
|
||||
failedOver = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (callForceDelivery) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Forcing delivery");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Forcing delivery");
|
||||
}
|
||||
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
|
||||
sessionContext.forceDelivery(this, forceDeliveryCount++);
|
||||
|
@ -289,15 +318,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
// forced delivery messages are discarded, nothing has been delivered by the queue
|
||||
resetIfSlowConsumer();
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Ignored force delivery answer as it belonged to another call");
|
||||
}
|
||||
// Ignore the message
|
||||
continue;
|
||||
|
@ -329,15 +358,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
largeMessageReceived = m;
|
||||
}
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Returning " + m);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Returning " + m);
|
||||
}
|
||||
|
||||
return m;
|
||||
}
|
||||
else {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Returning null");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Returning null");
|
||||
}
|
||||
resetIfSlowConsumer();
|
||||
return null;
|
||||
|
@ -351,12 +380,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
|
||||
@Override
|
||||
public ClientMessage receive(final long timeout) throws ActiveMQException {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: receive(" + timeout + ")");
|
||||
}
|
||||
ClientMessage msg = receive(timeout, false);
|
||||
|
||||
if (msg == null && !closed) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: receive(" + timeout + ") -> null, trying again with receive(0)");
|
||||
}
|
||||
msg = receive(0, true);
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: returning " + msg);
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
@ -470,6 +510,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
|
||||
@Override
|
||||
public void clearAtFailover() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::ClearAtFailover");
|
||||
}
|
||||
clearBuffer();
|
||||
|
||||
// failover will issue a start later
|
||||
|
@ -645,8 +688,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
return;
|
||||
}
|
||||
if (currentLargeMessageController == null) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Sending back credits for largeController = null " + flowControlSize);
|
||||
}
|
||||
flowControl(flowControlSize, false);
|
||||
}
|
||||
|
@ -721,12 +764,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
individualAcknowledge(message);
|
||||
}
|
||||
else {
|
||||
|
||||
ackBytes += message.getEncodeSize();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());
|
||||
}
|
||||
|
||||
if (ackBytes >= ackBatchSize) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: acknowledge acking " + cmi);
|
||||
}
|
||||
doAck(cmi);
|
||||
}
|
||||
else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
|
||||
}
|
||||
lastAckedMessage = cmi;
|
||||
}
|
||||
}
|
||||
|
@ -744,6 +798,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
@Override
|
||||
public void flushAcks() throws ActiveMQException {
|
||||
if (lastAckedMessage != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);
|
||||
}
|
||||
doAck(lastAckedMessage);
|
||||
}
|
||||
}
|
||||
|
@ -761,8 +818,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
|
||||
if (creditsToSend >= clientWindowSize) {
|
||||
if (clientWindowSize == 0 && discountSlowConsumer) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
|
||||
}
|
||||
|
||||
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
|
||||
|
@ -776,8 +833,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
}
|
||||
}
|
||||
else {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Sending " + messageBytes + " from flow-control");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Sending " + messageBytes + " from flow-control");
|
||||
}
|
||||
|
||||
final int credits = creditsToSend;
|
||||
|
@ -808,8 +865,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
* Sending an initial credit for slow consumers
|
||||
*/
|
||||
private void startSlowConsumer() {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
|
||||
}
|
||||
sendCredits(1);
|
||||
try {
|
||||
|
@ -853,8 +910,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
}
|
||||
|
||||
private void queueExecutor() {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Adding Runner on Executor for delivery");
|
||||
}
|
||||
|
||||
sessionExecutor.execute(runner);
|
||||
|
@ -944,8 +1001,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
flowControlBeforeConsumption(message);
|
||||
|
||||
if (!expired) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Calling handler.onMessage");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Calling handler.onMessage");
|
||||
}
|
||||
final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
|
||||
@Override
|
||||
|
@ -979,8 +1036,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
onMessageThread = null;
|
||||
}
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Handler.onMessage done");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Handler.onMessage done");
|
||||
}
|
||||
|
||||
if (message.isLargeMessage()) {
|
||||
|
@ -1064,9 +1121,21 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
|||
|
||||
lastAckedMessage = null;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Acking message " + message);
|
||||
}
|
||||
|
||||
session.acknowledge(this, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + "{" +
|
||||
"consumerContext=" + consumerContext +
|
||||
", queueName=" + queueName +
|
||||
'}';
|
||||
}
|
||||
|
||||
// Inner classes
|
||||
// --------------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -66,17 +66,11 @@ import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
|||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
|
||||
// Constants
|
||||
// ------------------------------------------------------------------------------------
|
||||
|
||||
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
|
||||
private static final boolean isDebug = ActiveMQClientLogger.LOGGER.isDebugEnabled();
|
||||
|
||||
// Attributes
|
||||
// -----------------------------------------------------------------------------------
|
||||
private static final Logger logger = Logger.getLogger(ClientSessionFactoryImpl.class);
|
||||
|
||||
private final ServerLocatorInternal serverLocator;
|
||||
|
||||
|
@ -270,14 +264,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
|
||||
if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Setting up backup config = " + backUp + " for live = " + live);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Setting up backup config = " + backUp + " for live = " + live);
|
||||
}
|
||||
backupConfig = backUp;
|
||||
}
|
||||
else {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
|
||||
" / " +
|
||||
backUp +
|
||||
" but it didn't belong to " +
|
||||
|
@ -514,7 +508,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
catch (ActiveMQInterruptedException e1) {
|
||||
// this is just a debug, since an interrupt is an expected event (in case of a shutdown)
|
||||
ActiveMQClientLogger.LOGGER.debug(e1.getMessage(), e1);
|
||||
logger.debug(e1.getMessage(), e1);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
//for anything else just close so clients are un blocked
|
||||
|
@ -548,8 +542,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
return;
|
||||
}
|
||||
|
||||
if (ClientSessionFactoryImpl.isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
|
||||
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
|
||||
logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
|
||||
}
|
||||
|
||||
callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
|
||||
|
@ -782,8 +776,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
private void getConnectionWithRetry(final int reconnectAttempts) {
|
||||
if (!clientProtocolManager.isAlive())
|
||||
return;
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("getConnectionWithRetry::" + reconnectAttempts +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("getConnectionWithRetry::" + reconnectAttempts +
|
||||
" with retryInterval = " +
|
||||
retryInterval +
|
||||
" multiplier = " +
|
||||
|
@ -795,13 +789,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
int count = 0;
|
||||
|
||||
while (clientProtocolManager.isAlive()) {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
|
||||
}
|
||||
|
||||
if (getConnection() != null) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Reconnection successful");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Reconnection successful");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -819,7 +813,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
return;
|
||||
}
|
||||
|
||||
if (ClientSessionFactoryImpl.isTrace) {
|
||||
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.waitingForRetry(interval, retryInterval, retryIntervalMultiplier);
|
||||
}
|
||||
|
||||
|
@ -842,7 +836,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
interval = newInterval;
|
||||
}
|
||||
else {
|
||||
ActiveMQClientLogger.LOGGER.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
|
||||
logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -929,14 +923,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
if (serverLocator.getTopology() != null) {
|
||||
if (connection != null) {
|
||||
if (ClientSessionFactoryImpl.isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::Subscribing Topology");
|
||||
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Subscribing Topology");
|
||||
}
|
||||
clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());
|
||||
}
|
||||
}
|
||||
else {
|
||||
ActiveMQClientLogger.LOGGER.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
|
||||
logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
|
||||
}
|
||||
|
||||
return connection;
|
||||
|
@ -1050,8 +1044,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
Connection transportConnection = connector.createConnection();
|
||||
|
||||
if (transportConnection == null) {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Connector towards " + connector + " failed");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Connector towards " + connector + " failed");
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1081,8 +1075,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
Connection transportConnection = null;
|
||||
|
||||
try {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Trying to connect with connector = " + connectorFactory +
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Trying to connect with connector = " + connectorFactory +
|
||||
", parameters = " +
|
||||
connectorConfig.getParams() +
|
||||
" connector = " +
|
||||
|
@ -1096,8 +1090,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
connector = liveConnector;
|
||||
}
|
||||
else if (backupConfig != null) {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Trying backup config = " + backupConfig);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Trying backup config = " + backupConfig);
|
||||
}
|
||||
|
||||
ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
|
||||
|
@ -1109,8 +1103,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
if (transportConnection != null) {
|
||||
/*looks like the backup is now live, let's use that*/
|
||||
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Connected to the backup at " + backupConfig);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Connected to the backup at " + backupConfig);
|
||||
}
|
||||
|
||||
// Switching backup as live
|
||||
|
@ -1120,8 +1114,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
connectorFactory = backupConnectorFactory;
|
||||
}
|
||||
else {
|
||||
if (ClientSessionFactoryImpl.isDebug) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Backup is not active.");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Backup is not active.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1166,7 +1160,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
theConn.bufferReceived(connectionID, buffer);
|
||||
}
|
||||
else {
|
||||
ActiveMQClientLogger.LOGGER.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
||||
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1279,8 +1273,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
Connection transportConnection = createTransportConnection();
|
||||
|
||||
if (transportConnection == null) {
|
||||
if (ClientSessionFactoryImpl.isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Neither backup or live were active, will just give up now");
|
||||
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
|
||||
logger.trace("Neither backup or live were active, will just give up now");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -1291,8 +1285,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
schedulePing();
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("returning " + newConnection);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("returning " + newConnection);
|
||||
}
|
||||
|
||||
return newConnection;
|
||||
|
@ -1320,8 +1314,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
@Override
|
||||
public void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID) {
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Disconnect being called on client:" +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Disconnect being called on client:" +
|
||||
" server locator = " +
|
||||
serverLocator +
|
||||
" notifying node " +
|
||||
|
|
|
@ -51,9 +51,12 @@ import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
|||
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.XidCodecSupport;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ClientSessionImpl.class);
|
||||
|
||||
private final Map<String, String> metadata = new HashMap<>();
|
||||
|
||||
private final ClientSessionFactoryInternal sessionFactory;
|
||||
|
@ -486,8 +489,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
public void commit() throws ActiveMQException {
|
||||
checkClosed();
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Sending commit");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Sending commit");
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -547,8 +550,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException
|
||||
{
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
|
||||
}
|
||||
checkClosed();
|
||||
|
||||
|
@ -745,8 +748,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
}
|
||||
|
||||
checkClosed();
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("client ack messageID = " + message.getMessageID());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("client ack messageID = " + message.getMessageID());
|
||||
}
|
||||
|
||||
startCall();
|
||||
|
@ -870,12 +873,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
@Override
|
||||
public void close() throws ActiveMQException {
|
||||
if (closed) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Session was already closed, giving up now, this=" + this);
|
||||
logger.debug("Session was already closed, giving up now, this=" + this);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Calling close on session " + this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Calling close on session " + this);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -891,7 +894,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
// Session close should always return without exception
|
||||
|
||||
// Note - we only log at trace
|
||||
ActiveMQClientLogger.LOGGER.trace("Failed to close session", e);
|
||||
logger.trace("Failed to close session", e);
|
||||
}
|
||||
|
||||
doCleanup(false);
|
||||
|
@ -1128,8 +1131,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void commit(final Xid xid, final boolean onePhase) throws XAException {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("call commit(xid=" + convert(xid));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("call commit(xid=" + convert(xid));
|
||||
}
|
||||
checkXA();
|
||||
|
||||
|
@ -1178,8 +1181,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void end(final Xid xid, final int flags) throws XAException {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags));
|
||||
}
|
||||
|
||||
checkXA();
|
||||
|
@ -1190,7 +1193,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
rollback(false, false);
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored);
|
||||
logger.debug("Error on rollback during end call!", ignored);
|
||||
}
|
||||
throw new XAException(XAException.XAER_RMFAIL);
|
||||
}
|
||||
|
@ -1315,8 +1318,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
@Override
|
||||
public int prepare(final Xid xid) throws XAException {
|
||||
checkXA();
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Calling prepare:: " + convert(xid));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Calling prepare:: " + convert(xid));
|
||||
}
|
||||
|
||||
if (rollbackOnly) {
|
||||
|
@ -1402,8 +1405,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
public void rollback(final Xid xid) throws XAException {
|
||||
checkXA();
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Calling rollback:: " + convert(xid));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Calling rollback:: " + convert(xid));
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1455,8 +1458,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void start(final Xid xid, final int flags) throws XAException {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags));
|
||||
}
|
||||
|
||||
checkXA();
|
||||
|
@ -1633,8 +1636,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
}
|
||||
|
||||
private void doCleanup(boolean failingOver) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("calling cleanup on " + this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("calling cleanup on " + this);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
|||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* This is the implementation of {@link org.apache.activemq.artemis.api.core.client.ServerLocator} and all
|
||||
|
@ -75,6 +76,8 @@ import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithI
|
|||
*/
|
||||
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ServerLocatorImpl.class);
|
||||
|
||||
private enum STATE {
|
||||
INITIALIZED, CLOSED, CLOSING
|
||||
}
|
||||
|
@ -536,8 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
synchronized (this) {
|
||||
// if the topologyArray is null, we will use the initialConnectors
|
||||
if (usedTopology != null) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Selecting connector from toplogy.");
|
||||
}
|
||||
int pos = loadBalancingPolicy.select(usedTopology.length);
|
||||
Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
|
||||
|
@ -546,8 +549,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
}
|
||||
else {
|
||||
// Get from initialconnectors
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Selecting connector from initial connectors.");
|
||||
}
|
||||
|
||||
int pos = loadBalancingPolicy.select(initialConnectors.length);
|
||||
|
@ -651,8 +654,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
public ClientSessionFactory createSessionFactory(String nodeID) throws Exception {
|
||||
TopologyMember topologyMember = topology.getMember(nodeID);
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe());
|
||||
}
|
||||
|
||||
if (topologyMember == null) {
|
||||
|
@ -1323,8 +1326,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
private void doClose(final boolean sendClose) {
|
||||
synchronized (stateGuard) {
|
||||
if (state == STATE.CLOSED) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + " is already closed when calling closed");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this + " is already closed when calling closed");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -1428,8 +1431,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
return;
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
|
||||
}
|
||||
|
||||
topology.removeMember(eventTime, nodeID);
|
||||
|
@ -1462,8 +1465,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
final String scaleDownGroupName,
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
|
||||
final boolean last) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
|
||||
}
|
||||
|
||||
TopologyMemberImpl member = new TopologyMemberImpl(nodeID, backupGroupName, scaleDownGroupName, connectorPair.getA(), connectorPair.getB());
|
||||
|
@ -1654,8 +1657,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
while (csf == null && !isClosed()) {
|
||||
retryNumber++;
|
||||
for (Connector conn : connectors) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + "::Submitting connect towards " + conn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this + "::Submitting connect towards " + conn);
|
||||
}
|
||||
|
||||
csf = conn.tryConnect();
|
||||
|
@ -1690,8 +1693,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
}
|
||||
});
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Returning " + csf +
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Returning " + csf +
|
||||
" after " +
|
||||
retryNumber +
|
||||
" retries on StaticConnector " +
|
||||
|
@ -1714,7 +1717,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
catch (RejectedExecutionException e) {
|
||||
if (isClosed() || skipWarnings)
|
||||
return null;
|
||||
ActiveMQClientLogger.LOGGER.debug("Rejected execution", e);
|
||||
logger.debug("Rejected execution", e);
|
||||
throw e;
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -1787,8 +1790,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
}
|
||||
|
||||
public ClientSessionFactory tryConnect() throws ActiveMQException {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + "::Trying to connect to " + factory);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this + "::Trying to connect to " + factory);
|
||||
}
|
||||
try {
|
||||
ClientSessionFactoryInternal factoryToUse = factory;
|
||||
|
@ -1805,7 +1808,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
return factoryToUse;
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + "::Exception on establish connector initial connection", e);
|
||||
logger.debug(this + "::Exception on establish connector initial connection", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,10 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
|||
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connector;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class Topology {
|
||||
private static final Logger logger = Logger.getLogger(Topology.class);
|
||||
|
||||
private final Set<ClusterTopologyListener> topologyListeners;
|
||||
|
||||
|
@ -76,8 +78,8 @@ public final class Topology {
|
|||
}
|
||||
this.executor = executor;
|
||||
this.owner = owner;
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,8 +91,8 @@ public final class Topology {
|
|||
}
|
||||
|
||||
public void addClusterTopologyListener(final ClusterTopologyListener listener) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Adding topology listener " + listener, new Exception("Trace"));
|
||||
}
|
||||
synchronized (topologyListeners) {
|
||||
topologyListeners.add(listener);
|
||||
|
@ -99,8 +101,8 @@ public final class Topology {
|
|||
}
|
||||
|
||||
public void removeClusterTopologyListener(final ClusterTopologyListener listener) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::Removing topology listener " + listener, new Exception("Trace"));
|
||||
}
|
||||
synchronized (topologyListeners) {
|
||||
topologyListeners.remove(listener);
|
||||
|
@ -112,8 +114,8 @@ public final class Topology {
|
|||
*/
|
||||
public void updateAsLive(final String nodeId, final TopologyMemberImpl memberInput) {
|
||||
synchronized (this) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this + "::node " + nodeId + "=" + memberInput);
|
||||
}
|
||||
memberInput.setUniqueEventID(System.currentTimeMillis());
|
||||
topology.remove(nodeId);
|
||||
|
@ -142,15 +144,15 @@ public final class Topology {
|
|||
*/
|
||||
public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput) {
|
||||
final String nodeId = memberInput.getNodeId();
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
TopologyMemberImpl currentMember = getMember(nodeId);
|
||||
if (currentMember == null) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace"));
|
||||
}
|
||||
|
||||
currentMember = memberInput;
|
||||
|
@ -178,7 +180,7 @@ public final class Topology {
|
|||
|
||||
Long deleteTme = getMapDelete().get(nodeId);
|
||||
if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID +
|
||||
logger.debug("Update uniqueEvent=" + uniqueEventID +
|
||||
", nodeId=" +
|
||||
nodeId +
|
||||
", memberInput=" +
|
||||
|
@ -191,8 +193,8 @@ public final class Topology {
|
|||
TopologyMemberImpl currentMember = topology.get(nodeId);
|
||||
|
||||
if (currentMember == null) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace"));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace"));
|
||||
}
|
||||
memberInput.setUniqueEventID(uniqueEventID);
|
||||
topology.put(nodeId, memberInput);
|
||||
|
@ -210,8 +212,8 @@ public final class Topology {
|
|||
newMember.setBackup(currentMember.getBackup());
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" +
|
||||
currentMember + ", memberInput=" + memberInput + "newMember=" +
|
||||
newMember, new Exception("trace"));
|
||||
}
|
||||
|
@ -241,8 +243,8 @@ public final class Topology {
|
|||
private void sendMemberUp(final String nodeId, final TopologyMemberImpl memberToSend) {
|
||||
final ArrayList<ClusterTopologyListener> copy = copyListeners();
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
|
||||
}
|
||||
|
||||
if (copy.size() > 0) {
|
||||
|
@ -250,8 +252,8 @@ public final class Topology {
|
|||
@Override
|
||||
public void run() {
|
||||
for (ClusterTopologyListener listener : copy) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(Topology.this + " informing " +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(Topology.this + " informing " +
|
||||
listener +
|
||||
" about node up = " +
|
||||
nodeId +
|
||||
|
@ -289,7 +291,7 @@ public final class Topology {
|
|||
member = topology.get(nodeId);
|
||||
if (member != null) {
|
||||
if (member.getUniqueEventID() > uniqueEventID) {
|
||||
ActiveMQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
|
||||
logger.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
|
||||
member = null;
|
||||
}
|
||||
else {
|
||||
|
@ -299,8 +301,8 @@ public final class Topology {
|
|||
}
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("removeMember " + this +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("removeMember " + this +
|
||||
" removing nodeID=" +
|
||||
nodeId +
|
||||
", result=" +
|
||||
|
@ -316,8 +318,8 @@ public final class Topology {
|
|||
@Override
|
||||
public void run() {
|
||||
for (ClusterTopologyListener listener : copy) {
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this + " informing " + listener + " about node down = " + nodeId);
|
||||
}
|
||||
try {
|
||||
listener.nodeDown(uniqueEventID, nodeId);
|
||||
|
@ -333,8 +335,8 @@ public final class Topology {
|
|||
}
|
||||
|
||||
public synchronized void sendTopology(final ClusterTopologyListener listener) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this + " is sending topology to " + listener);
|
||||
}
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
|
@ -349,8 +351,8 @@ public final class Topology {
|
|||
}
|
||||
|
||||
for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug(Topology.this + " sending " +
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(Topology.this + " sending " +
|
||||
entry.getKey() +
|
||||
" / " +
|
||||
entry.getValue().getConnector() +
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
|||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}.
|
||||
|
@ -47,7 +48,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
|
|||
*/
|
||||
public final class DiscoveryGroup implements ActiveMQComponent {
|
||||
|
||||
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
private static final Logger logger = Logger.getLogger(DiscoveryGroup.class);
|
||||
|
||||
private final List<DiscoveryListener> listeners = new ArrayList<>();
|
||||
|
||||
|
@ -317,10 +318,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
|
|||
//only call the listeners if we have changed
|
||||
//also make sure that we aren't stopping to avoid deadlock
|
||||
if (changed && started) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Connectors changed on Discovery:");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Connectors changed on Discovery:");
|
||||
for (DiscoveryEntry connector : connectors.values()) {
|
||||
ActiveMQClientLogger.LOGGER.trace(connector);
|
||||
logger.trace(connector);
|
||||
}
|
||||
}
|
||||
callListeners();
|
||||
|
@ -376,8 +377,8 @@ public final class DiscoveryGroup implements ActiveMQComponent {
|
|||
Map.Entry<String, DiscoveryEntry> entry = iter.next();
|
||||
|
||||
if (entry.getValue().getLastUpdate() + timeout <= now) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Timed out node on discovery:" + entry.getValue());
|
||||
}
|
||||
iter.remove();
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.protocol.ClientPacketDecoder;
|
||||
|
@ -59,6 +58,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* This class will return specific packets for different types of actions happening on a messaging protocol.
|
||||
|
@ -74,6 +74,8 @@ import org.apache.activemq.artemis.utils.VersionLoader;
|
|||
|
||||
public class ActiveMQClientProtocolManager implements ClientProtocolManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ActiveMQClientProtocolManager.class);
|
||||
|
||||
private static final String handshake = "ARTEMIS";
|
||||
|
||||
private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
|
||||
|
@ -504,8 +506,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
|
|||
}
|
||||
|
||||
if (topMessage.isExit()) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Notifying " + topMessage.getNodeID() + " going down");
|
||||
}
|
||||
|
||||
if (topologyResponseHandler != null) {
|
||||
|
|
|
@ -45,6 +45,13 @@ public class ActiveMQConsumerContext extends ConsumerContext {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ActiveMQConsumerContext{" +
|
||||
"id=" + id +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (int) (id ^ (id >>> 32));
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
|
||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
|
||||
|
@ -110,6 +111,8 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
|||
|
||||
public class ActiveMQSessionContext extends SessionContext {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
|
||||
|
||||
private final Channel sessionChannel;
|
||||
private final int serverVersion;
|
||||
private int confirmationWindow;
|
||||
|
@ -340,8 +343,8 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
throw new XAException(response.getResponseCode());
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,8 +39,10 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class ChannelImpl implements Channel {
|
||||
private static final Logger logger = Logger.getLogger(ChannelImpl.class);
|
||||
|
||||
public enum CHANNEL_ID {
|
||||
/**
|
||||
|
@ -79,8 +81,6 @@ public final class ChannelImpl implements Channel {
|
|||
}
|
||||
}
|
||||
|
||||
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
|
||||
private volatile long id;
|
||||
|
||||
/** This is used in */
|
||||
|
@ -242,8 +242,8 @@ public final class ChannelImpl implements Channel {
|
|||
synchronized (sendLock) {
|
||||
packet.setChannelID(id);
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
|
||||
}
|
||||
|
||||
ActiveMQBuffer buffer = packet.encode(connection);
|
||||
|
@ -258,7 +258,7 @@ public final class ChannelImpl implements Channel {
|
|||
}
|
||||
else {
|
||||
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
|
||||
ActiveMQClientLogger.LOGGER.debug("timed-out waiting for fail-over condition on non-blocking send");
|
||||
logger.debug("timed-out waiting for fail-over condition on non-blocking send");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -280,8 +280,8 @@ public final class ChannelImpl implements Channel {
|
|||
lock.unlock();
|
||||
}
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Writing buffer for channelID=" + id);
|
||||
}
|
||||
|
||||
checkReconnectID(reconnectID);
|
||||
|
@ -346,7 +346,7 @@ public final class ChannelImpl implements Channel {
|
|||
}
|
||||
else {
|
||||
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
|
||||
ActiveMQClientLogger.LOGGER.debug("timed-out waiting for fail-over condition on blocking send");
|
||||
logger.debug("timed-out waiting for fail-over condition on blocking send");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -363,6 +363,9 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
checkReconnectID(reconnectID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Sending blocking " + packet);
|
||||
}
|
||||
connection.getTransportConnection().write(buffer, false, false);
|
||||
|
||||
long toWait = connection.getBlockingCallTimeout();
|
||||
|
@ -427,12 +430,12 @@ public final class ChannelImpl implements Channel {
|
|||
try {
|
||||
boolean callNext = interceptor.intercept(packet, connection);
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
// use a StringBuilder for speed since this may be executed a lot
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on ").
|
||||
append(packet).append(" returned ").append(callNext);
|
||||
ActiveMQClientLogger.LOGGER.debug(msg.toString());
|
||||
logger.debug(msg.toString());
|
||||
}
|
||||
|
||||
if (!callNext) {
|
||||
|
@ -505,8 +508,8 @@ public final class ChannelImpl implements Channel {
|
|||
@Override
|
||||
public void replayCommands(final int otherLastConfirmedCommandID) {
|
||||
if (resendCache != null) {
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Replaying commands on channelID=" + id);
|
||||
}
|
||||
clearUpTo(otherLastConfirmedCommandID);
|
||||
|
||||
|
@ -553,8 +556,8 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
confirmed.setChannelID(id);
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
|
||||
}
|
||||
|
||||
doWrite(confirmed);
|
||||
|
@ -566,8 +569,8 @@ public final class ChannelImpl implements Channel {
|
|||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||
lastConfirmedCommandID.incrementAndGet();
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
|
||||
}
|
||||
|
||||
receivedBytes += packet.getPacketSize();
|
||||
|
@ -639,16 +642,16 @@ public final class ChannelImpl implements Channel {
|
|||
private void addResendPacket(Packet packet) {
|
||||
resendCache.add(packet);
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
|
||||
}
|
||||
}
|
||||
|
||||
private void clearUpTo(final int lastReceivedCommandID) {
|
||||
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
|
||||
" first commandID=" + firstStoredCommandID +
|
||||
" number to clear " + numberToClear);
|
||||
}
|
||||
|
@ -662,8 +665,8 @@ public final class ChannelImpl implements Channel {
|
|||
return;
|
||||
}
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
|
||||
}
|
||||
if (commandConfirmationHandler != null) {
|
||||
commandConfirmationHandler.commandConfirmed(packet);
|
||||
|
|
|
@ -38,18 +38,11 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
|||
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {
|
||||
// Constants
|
||||
// ------------------------------------------------------------------------------------
|
||||
private static final Logger logger = Logger.getLogger(RemotingConnectionImpl.class);
|
||||
|
||||
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
|
||||
// Static
|
||||
// ---------------------------------------------------------------------------------------
|
||||
|
||||
// Attributes
|
||||
// -----------------------------------------------------------------------------------
|
||||
private final PacketDecoder packetDecoder;
|
||||
|
||||
private final Map<Long, Channel> channels = new ConcurrentHashMap<>();
|
||||
|
@ -342,8 +335,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
try {
|
||||
final Packet packet = packetDecoder.decode(buffer);
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("handling packet " + packet);
|
||||
}
|
||||
|
||||
dataReceived = true;
|
||||
|
|
|
@ -101,10 +101,12 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.utils.Base64.encodeBytes;
|
||||
|
||||
public class NettyConnector extends AbstractConnector {
|
||||
private static final Logger logger = Logger.getLogger(NettyConnector.class);
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
public static final String JAVAX_KEYSTORE_PATH_PROP_NAME = "javax.net.ssl.keyStore";
|
||||
|
@ -528,7 +530,7 @@ public class NettyConnector extends AbstractConnector {
|
|||
batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
ActiveMQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION);
|
||||
logger.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -589,7 +591,7 @@ public class NettyConnector extends AbstractConnector {
|
|||
}
|
||||
}
|
||||
|
||||
ActiveMQClientLogger.LOGGER.debug("Remote destination: " + remoteDestination);
|
||||
logger.debug("Remote destination: " + remoteDestination);
|
||||
|
||||
ChannelFuture future;
|
||||
//port 0 does not work so only use local address if set
|
||||
|
@ -659,7 +661,7 @@ public class NettyConnector extends AbstractConnector {
|
|||
request.headers().set(SEC_ACTIVEMQ_REMOTING_KEY, key);
|
||||
ch.attr(REMOTING_KEY).set(key);
|
||||
|
||||
ActiveMQClientLogger.LOGGER.debugf("Sending HTTP request %s", request);
|
||||
logger.debugf("Sending HTTP request %s", request);
|
||||
|
||||
// Send the HTTP request.
|
||||
ch.writeAndFlush(request);
|
||||
|
@ -985,7 +987,7 @@ public class NettyConnector extends AbstractConnector {
|
|||
InetAddress inetAddr2 = InetAddress.getByName(this.host);
|
||||
String ip1 = inetAddr1.getHostAddress();
|
||||
String ip2 = inetAddr2.getHostAddress();
|
||||
ActiveMQClientLogger.LOGGER.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2);
|
||||
logger.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2);
|
||||
|
||||
result = ip1.equals(ip2);
|
||||
}
|
||||
|
|
|
@ -30,9 +30,12 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
|
|||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public abstract class AbstractRemotingConnection implements RemotingConnection {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractRemotingConnection.class);
|
||||
|
||||
protected final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
|
||||
protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
|
||||
protected final Connection transportConnection;
|
||||
|
@ -65,7 +68,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
|
|||
}
|
||||
catch (ActiveMQInterruptedException interrupted) {
|
||||
// this is an expected behaviour.. no warn or error here
|
||||
ActiveMQClientLogger.LOGGER.debug("thread interrupted", interrupted);
|
||||
logger.debug("thread interrupted", interrupted);
|
||||
}
|
||||
catch (final Throwable t) {
|
||||
// Failure of one listener to execute shouldn't prevent others
|
||||
|
|
|
@ -23,12 +23,15 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
|
||||
*/
|
||||
public final class OrderedExecutorFactory implements ExecutorFactory {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
|
||||
|
||||
private final Executor parent;
|
||||
|
||||
/**
|
||||
|
@ -101,7 +104,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
|
|||
}
|
||||
catch (ActiveMQInterruptedException e) {
|
||||
// This could happen during shutdowns. Nothing to be concerned about here
|
||||
ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
|
||||
logger.debug("Interrupted Thread", e);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
|
||||
|
|
|
@ -28,11 +28,11 @@ import java.util.Set;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements Map<K, V> {
|
||||
|
||||
private final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
|
||||
private static final Logger logger = Logger.getLogger(SoftValueHashMap.class);
|
||||
|
||||
// The soft references that are already good.
|
||||
// too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
|
||||
|
@ -170,8 +170,8 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
|
|||
if (ref.used > 0) {
|
||||
Object removed = mapDelegate.remove(ref.key);
|
||||
|
||||
if (isTrace) {
|
||||
ActiveMQClientLogger.LOGGER.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
|
||||
}
|
||||
|
||||
if (mapDelegate.size() <= maxElements) {
|
||||
|
@ -316,6 +316,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
|
|||
private void processQueue() {
|
||||
AggregatedSoftReference ref = null;
|
||||
while ((ref = (AggregatedSoftReference) this.refQueue.poll()) != null) {
|
||||
logger.tracef("Removing reference through processQueue:: %s", ref.get());
|
||||
mapDelegate.remove(ref.key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.NamedNodeMap;
|
||||
|
@ -44,6 +45,8 @@ import org.xml.sax.SAXException;
|
|||
|
||||
public final class XMLUtil {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(XMLUtil.class);
|
||||
|
||||
private XMLUtil() {
|
||||
// Utility class
|
||||
}
|
||||
|
@ -288,7 +291,7 @@ public final class XMLUtil {
|
|||
val = parts[1].trim();
|
||||
}
|
||||
String sysProp = System.getProperty(prop, val);
|
||||
ActiveMQClientLogger.LOGGER.debug("replacing " + subString + " with " + sysProp);
|
||||
logger.debug("replacing " + subString + " with " + sysProp);
|
||||
xml = xml.replace(subString, sysProp);
|
||||
|
||||
}
|
||||
|
|
|
@ -27,12 +27,14 @@ import javax.jms.Topic;
|
|||
import javax.jms.TopicSubscriber;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||
|
||||
/**
|
||||
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
|
||||
|
@ -216,6 +218,7 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
|
|||
jmsMsg.doBeforeReceive();
|
||||
}
|
||||
catch (IndexOutOfBoundsException ioob) {
|
||||
((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
|
||||
// In case this exception happen you will need to know where it happened.
|
||||
// it has been a bug here in the past, and this was used to debug it.
|
||||
// nothing better than keep it for future investigations in case it happened again
|
||||
|
@ -238,6 +241,11 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
|
|||
return jmsMsg;
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
catch (ActiveMQInterruptedException e) {
|
||||
((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,9 +22,19 @@ import javax.jms.JMSException;
|
|||
import javax.jms.JMSSecurityException;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
|
||||
public final class JMSExceptionHelper {
|
||||
|
||||
public static JMSException convertFromActiveMQException(final ActiveMQInterruptedException me) {
|
||||
JMSException je = new javax.jms.IllegalStateException(me.getMessage());
|
||||
|
||||
je.setStackTrace(me.getStackTrace());
|
||||
|
||||
je.initCause(me);
|
||||
return je;
|
||||
}
|
||||
|
||||
public static JMSException convertFromActiveMQException(final ActiveMQException me) {
|
||||
JMSException je;
|
||||
switch (me.getType()) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||
|
||||
public class JMSMessageListenerWrapper implements MessageHandler {
|
||||
|
||||
|
@ -83,6 +84,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
|
|||
message.acknowledge();
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
|
||||
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
|
||||
}
|
||||
}
|
||||
|
@ -122,6 +124,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
|
|||
}
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
|
||||
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
package org.apache.activemq.artemis.core.paging;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
||||
|
@ -30,6 +33,8 @@ public interface PagingStoreFactory {
|
|||
|
||||
PagingStore newStore(SimpleString address, AddressSettings addressSettings);
|
||||
|
||||
PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor);
|
||||
|
||||
void stop() throws InterruptedException;
|
||||
|
||||
void setPagingManager(PagingManager manager);
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.paging.cursor;
|
||||
|
||||
/** This is an internal exception.
|
||||
* In certain cases AfterCommit could try to decrease the reference counting on large messages.
|
||||
* But if the whole page is cleaned an exception could happen, which is ok on that path, and we need to identify it. */
|
||||
public class NonExistentPage extends RuntimeException {
|
||||
|
||||
public NonExistentPage() {
|
||||
}
|
||||
|
||||
public NonExistentPage(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public NonExistentPage(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public NonExistentPage(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public NonExistentPage(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
|
@ -24,6 +24,9 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
|
|||
*/
|
||||
public interface PageCursorProvider {
|
||||
|
||||
/** Used on tests, to simulate a scenario where the VM cleared space */
|
||||
void clearCache();
|
||||
|
||||
PageCache getPageCache(long pageNr);
|
||||
|
||||
PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
|
||||
|
|
|
@ -93,7 +93,7 @@ public interface PageSubscription {
|
|||
*/
|
||||
void reloadACK(PagePosition position);
|
||||
|
||||
void reloadPageCompletion(PagePosition position);
|
||||
void reloadPageCompletion(PagePosition position) throws Exception;
|
||||
|
||||
void reloadPageInfo(long pageNr);
|
||||
|
||||
|
|
|
@ -23,13 +23,16 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
|
|||
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
|
||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* This is the same as PageCache, however this is for the page that's being currently written.
|
||||
*/
|
||||
public class LivePageCacheImpl implements LivePageCache {
|
||||
|
||||
private final List<PagedMessage> messages = new LinkedList<>();
|
||||
private static final Logger logger = Logger.getLogger(LivePageCacheImpl.class);
|
||||
|
||||
private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
|
||||
|
||||
private final Page page;
|
||||
|
||||
|
@ -82,6 +85,7 @@ public class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
logger.tracef("Closing %s", this);
|
||||
this.isLive = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||
|
@ -58,20 +59,20 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
/**
|
||||
* As an optimization, avoid subsequent schedules as they are unnecessary
|
||||
*/
|
||||
private final AtomicInteger scheduledCleanup = new AtomicInteger(0);
|
||||
protected final AtomicInteger scheduledCleanup = new AtomicInteger(0);
|
||||
|
||||
private volatile boolean cleanupEnabled = true;
|
||||
protected volatile boolean cleanupEnabled = true;
|
||||
|
||||
private final PagingStore pagingStore;
|
||||
protected final PagingStore pagingStore;
|
||||
|
||||
private final StorageManager storageManager;
|
||||
protected final StorageManager storageManager;
|
||||
|
||||
// This is the same executor used at the PageStoreImpl. One Executor per pageStore
|
||||
private final Executor executor;
|
||||
|
||||
private final SoftValueHashMap<Long, PageCache> softCache;
|
||||
|
||||
private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
|
@ -115,7 +116,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
|
||||
// sanity check, this should never happen unless there's a bug
|
||||
throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
|
||||
throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
|
||||
}
|
||||
|
||||
return cache.getMessage(pos.getMessageNr());
|
||||
|
@ -146,9 +147,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
cache = createPageCache(pageId);
|
||||
// anyone reading from this cache will have to wait reading to finish first
|
||||
// we also want only one thread reading this cache
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
|
||||
}
|
||||
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
|
||||
readPage((int) pageId, cache);
|
||||
softCache.put(pageId, cache);
|
||||
}
|
||||
|
@ -186,6 +185,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
@Override
|
||||
public void addPageCache(PageCache cache) {
|
||||
logger.tracef("Add page cache %s", cache);
|
||||
synchronized (softCache) {
|
||||
softCache.put(cache.getPageId(), cache);
|
||||
}
|
||||
|
@ -203,6 +203,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearCache() {
|
||||
synchronized (softCache) {
|
||||
softCache.clear();
|
||||
|
@ -273,6 +274,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
@Override
|
||||
public void scheduleCleanup() {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("scheduling cleanup", new Exception("trace"));
|
||||
}
|
||||
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
|
||||
// Scheduled cleanup was already scheduled before.. never mind!
|
||||
// or we have cleanup disabled
|
||||
|
@ -286,8 +290,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
public void run() {
|
||||
storageManager.setContext(storageManager.newSingleThreadContext());
|
||||
try {
|
||||
if (cleanupEnabled) {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
storageManager.clearContext();
|
||||
scheduledCleanup.decrementAndGet();
|
||||
|
@ -336,7 +342,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
ArrayList<Page> depagedPages = new ArrayList<>();
|
||||
|
||||
logger.tracef("performing page cleanup %s", this);
|
||||
|
||||
ArrayList<Page> depagedPages = new ArrayList<Page>();
|
||||
|
||||
while (true) {
|
||||
if (pagingStore.lock(100)) {
|
||||
|
@ -346,6 +355,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
return;
|
||||
}
|
||||
|
||||
logger.tracef("%s locked", this);
|
||||
|
||||
synchronized (this) {
|
||||
try {
|
||||
if (!pagingStore.isStarted()) {
|
||||
|
@ -356,14 +367,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
return;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
|
||||
}
|
||||
|
||||
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
|
||||
|
||||
long minPage = checkMinPage(cursorList);
|
||||
|
||||
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
|
||||
|
||||
// if the current page is being written...
|
||||
// on that case we need to move to verify it in a different way
|
||||
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
|
||||
|
@ -376,18 +385,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
|
||||
if (complete) {
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Address " + pagingStore.getAddress() +
|
||||
" is leaving page mode as all messages are consumed and acknowledged from the page store");
|
||||
}
|
||||
|
||||
pagingStore.forceAnotherPage();
|
||||
|
||||
Page currentPage = pagingStore.getCurrentPage();
|
||||
|
||||
storeBookmark(cursorList, currentPage);
|
||||
|
||||
pagingStore.stopPaging();
|
||||
cleanupComplete(cursorList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -423,7 +421,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
pagingStore.unlock();
|
||||
}
|
||||
}
|
||||
finishCleanup(depagedPages);
|
||||
|
||||
|
||||
}
|
||||
|
||||
// Protected as a way to inject testing
|
||||
protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Address " + pagingStore.getAddress() +
|
||||
" is leaving page mode as all messages are consumed and acknowledged from the page store");
|
||||
}
|
||||
|
||||
pagingStore.forceAnotherPage();
|
||||
|
||||
Page currentPage = pagingStore.getCurrentPage();
|
||||
|
||||
storeBookmark(cursorList, currentPage);
|
||||
|
||||
pagingStore.stopPaging();
|
||||
}
|
||||
|
||||
// Protected as a way to inject testing
|
||||
protected void finishCleanup(ArrayList<Page> depagedPages) {
|
||||
logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
|
||||
try {
|
||||
for (Page depagedPage : depagedPages) {
|
||||
PageCache cache;
|
||||
|
@ -433,7 +454,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing page " + depagedPage.getPageId() + " from page-cache");
|
||||
logger.trace("Removing pageNr=" + depagedPage.getPageId() + " from page-cache");
|
||||
}
|
||||
|
||||
if (cache == null) {
|
||||
|
@ -479,12 +500,15 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
|
||||
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) {
|
||||
|
||||
logger.tracef("checkPageCompletion(%d)", minPage);
|
||||
|
||||
boolean complete = true;
|
||||
|
||||
for (PageSubscription cursor : cursorList) {
|
||||
if (!cursor.isComplete(minPage)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
|
||||
logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
|
||||
}
|
||||
|
||||
complete = false;
|
||||
|
@ -492,7 +516,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
|
||||
logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -545,6 +569,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PageCursorProviderImpl{" +
|
||||
"pagingStore=" + pagingStore +
|
||||
'}';
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
|
|
@ -191,7 +191,11 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
* cursor/subscription.
|
||||
*/
|
||||
@Override
|
||||
public void reloadPageCompletion(PagePosition position) {
|
||||
public void reloadPageCompletion(PagePosition position) throws Exception {
|
||||
// if the current page is complete, we must move it out of the way
|
||||
if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
|
||||
pageStore.forceAnotherPage();
|
||||
}
|
||||
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
|
||||
info.setCompleteInfo(position);
|
||||
synchronized (consumedPages) {
|
||||
|
@ -202,6 +206,9 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
@Override
|
||||
public void scheduleCleanupCheck() {
|
||||
if (autoCleanup) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Scheduling cleanup", new Exception("trace"));
|
||||
}
|
||||
if (scheduledCleanupCount.get() > 2) {
|
||||
return;
|
||||
}
|
||||
|
@ -212,8 +219,10 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (autoCleanup) {
|
||||
cleanupEntries(false);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.problemCleaningCursorPages(e);
|
||||
}
|
||||
|
@ -242,6 +251,9 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
if (completeDelete) {
|
||||
counter.delete();
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("cleanupEntries", new Exception("trace"));
|
||||
}
|
||||
Transaction tx = new TransactionImpl(store);
|
||||
|
||||
boolean persist = false;
|
||||
|
@ -564,17 +576,23 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
@Override
|
||||
public boolean isComplete(long page) {
|
||||
logger.tracef("%s isComplete %d", this, page);
|
||||
synchronized (consumedPages) {
|
||||
if (empty && consumedPages.isEmpty()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("isComplete(%d)::Subscription %s has empty=%s, consumedPages.isEmpty=%s", (Object)page, this, consumedPages.isEmpty());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
PageCursorInfo info = consumedPages.get(page);
|
||||
|
||||
if (info == null && empty) {
|
||||
logger.tracef("isComplete(%d)::::Couldn't find info and it is empty", page);
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
logger.tracef("isComplete(%d)::calling is %s", (Object)page, this, consumedPages.isEmpty());
|
||||
return info != null && info.isDone();
|
||||
}
|
||||
}
|
||||
|
@ -731,18 +749,18 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
@Override
|
||||
public void reloadPageInfo(long pageNr) {
|
||||
getPageInfo(pageNr, true);
|
||||
getPageInfo(pageNr);
|
||||
}
|
||||
|
||||
private PageCursorInfo getPageInfo(final PagePosition pos) {
|
||||
return getPageInfo(pos.getPageNr(), true);
|
||||
return getPageInfo(pos.getPageNr());
|
||||
}
|
||||
|
||||
private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
|
||||
private PageCursorInfo getPageInfo(final long pageNr) {
|
||||
synchronized (consumedPages) {
|
||||
PageCursorInfo pageInfo = consumedPages.get(pageNr);
|
||||
|
||||
if (create && pageInfo == null) {
|
||||
if (pageInfo == null) {
|
||||
PageCache cache = cursorProvider.getPageCache(pageNr);
|
||||
if (cache == null) {
|
||||
return null;
|
||||
|
@ -814,7 +832,11 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
tx.setContainsPersistent();
|
||||
}
|
||||
|
||||
getPageInfo(position).remove(position);
|
||||
PageCursorInfo info = getPageInfo(position);
|
||||
|
||||
logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info);
|
||||
|
||||
info.remove(position);
|
||||
|
||||
PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
|
||||
|
||||
|
@ -897,16 +919,17 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
return "PageCursorInfo::PageID=" + pageId +
|
||||
return "PageCursorInfo::pageNr=" + pageId +
|
||||
" numberOfMessage = " +
|
||||
numberOfMessages +
|
||||
", confirmed = " +
|
||||
confirmed +
|
||||
", isDone=" +
|
||||
this.isDone();
|
||||
this.isDone() +
|
||||
" wasLive = " + wasLive;
|
||||
}
|
||||
catch (Exception e) {
|
||||
return "PageCursorInfo::PageID=" + pageId +
|
||||
return "PageCursorInfo::pageNr=" + pageId +
|
||||
" numberOfMessage = " +
|
||||
numberOfMessages +
|
||||
", confirmed = " +
|
||||
|
@ -917,6 +940,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
}
|
||||
|
||||
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
|
||||
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache);
|
||||
this.pageId = pageId;
|
||||
this.numberOfMessages = numberOfMessages;
|
||||
if (cache != null) {
|
||||
|
@ -932,6 +956,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
* @param completePage
|
||||
*/
|
||||
public void setCompleteInfo(final PagePosition completePage) {
|
||||
logger.tracef("Setting up complete page %s on cursor %s on subscription %s", completePage, this, PageSubscriptionImpl.this);
|
||||
this.completePage = completePage;
|
||||
}
|
||||
|
||||
|
@ -940,6 +965,10 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
}
|
||||
|
||||
public boolean isDone() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
|
||||
|
||||
}
|
||||
return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
|
||||
}
|
||||
|
||||
|
@ -983,7 +1012,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
" confirmed = " +
|
||||
(confirmed.get() + 1) +
|
||||
" pendingTX = " + pendingTX +
|
||||
", page = " +
|
||||
", pageNr = " +
|
||||
pageId + " posACK = " + posACK);
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
|
@ -1189,7 +1218,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
ignored = true;
|
||||
}
|
||||
|
||||
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr(), false);
|
||||
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
|
||||
|
||||
if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
|
||||
continue;
|
||||
|
|
|
@ -251,7 +251,7 @@ public final class Page implements Comparable<Page> {
|
|||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Deleting pageId=" + pageId + " on store " + storeName);
|
||||
logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
|
||||
}
|
||||
|
||||
if (messages != null) {
|
||||
|
@ -294,7 +294,7 @@ public final class Page implements Comparable<Page> {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Page::pageID=" + this.pageId + ", file=" + this.file;
|
||||
return "Page::pageNr=" + this.pageId + ", file=" + this.file;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,12 +34,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
|||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class PageTransactionInfoImpl implements PageTransactionInfo {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class);
|
||||
|
||||
private long transactionID;
|
||||
|
||||
private volatile long recordID = -1;
|
||||
|
@ -239,19 +242,36 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
|
|||
public synchronized boolean deliverAfterCommit(PageIterator iterator,
|
||||
PageSubscription cursor,
|
||||
PagePosition cursorPos) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("deliver after commit on " + cursor + ", position=" + cursorPos);
|
||||
}
|
||||
|
||||
if (committed && useRedelivery) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("commit & useRedelivery on " + cursor + ", position=" + cursorPos);
|
||||
}
|
||||
cursor.addPendingDelivery(cursorPos);
|
||||
cursor.redeliver(iterator, cursorPos);
|
||||
return true;
|
||||
}
|
||||
else if (committed) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("committed on " + cursor + ", position=" + cursorPos + ", ignoring position");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else if (rolledback) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos);
|
||||
}
|
||||
cursor.positionIgnored(cursorPos);
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("deliverAftercommit/else, marking useRedelivery on " + cursor + ", position " + cursorPos);
|
||||
}
|
||||
useRedelivery = true;
|
||||
if (lateDeliveries == null) {
|
||||
lateDeliveries = new LinkedList<>();
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
|||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
|
@ -92,6 +95,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
|
||||
|
||||
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
|||
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.LivePageCacheImpl;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.replication.ReplicationManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
|
@ -69,7 +68,7 @@ import org.jboss.logging.Logger;
|
|||
*/
|
||||
public class PagingStoreImpl implements PagingStore {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(Page.class);
|
||||
private static final Logger logger = Logger.getLogger(PagingStoreImpl.class);
|
||||
|
||||
private final SimpleString address;
|
||||
|
||||
|
@ -173,7 +172,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
this.syncTimer = null;
|
||||
}
|
||||
|
||||
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor);
|
||||
|
||||
}
|
||||
|
||||
|
@ -831,7 +830,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
|
||||
" pageId=" + currentPage.getPageId());
|
||||
" pageNr=" + currentPage.getPageId());
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -1021,6 +1020,10 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
int tmpCurrentPageId = currentPageId + 1;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace"));
|
||||
}
|
||||
|
||||
if (currentPage != null) {
|
||||
currentPage.close(true);
|
||||
}
|
||||
|
|
|
@ -543,13 +543,15 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
}
|
||||
bindingsJournal = new ReplicatedJournal(((byte) 0), originalBindingsJournal, replicator);
|
||||
messageJournal = new ReplicatedJournal((byte) 1, originalMessageJournal, replicator);
|
||||
|
||||
// We need to send the list while locking otherwise part of the body might get sent too soon
|
||||
// it will send a list of IDs that we are allocating
|
||||
replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
|
||||
}
|
||||
finally {
|
||||
storageManagerLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
// it will send a list of IDs that we are allocating
|
||||
replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
|
||||
sendJournalFile(messageFiles, JournalContent.MESSAGES);
|
||||
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
|
||||
sendLargeMessageFiles(pendingLargeMessages);
|
||||
|
|
|
@ -340,11 +340,22 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
|
||||
",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
|
||||
return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
|
||||
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
|
||||
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
|
||||
}
|
||||
|
||||
private static String toDate(long timestamp) {
|
||||
if (timestamp == 0) {
|
||||
return "0";
|
||||
}
|
||||
else {
|
||||
return new java.util.Date(timestamp).toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
|
|
@ -222,7 +222,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
|||
}
|
||||
else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID));
|
||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
|
||||
}
|
||||
// For a tx, it's important that the entry is not added to the cache until commit
|
||||
// since if the client fails then resends them tx we don't want it to get rejected
|
||||
|
|
|
@ -176,6 +176,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
return;
|
||||
}
|
||||
|
||||
logger.tracef("Starting remoting service %s", this);
|
||||
|
||||
paused = false;
|
||||
|
||||
// The remoting service maintains it's own thread pool for handling remoting traffic
|
||||
|
|
|
@ -1671,9 +1671,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
this.queueFactory = factory;
|
||||
}
|
||||
|
||||
private PagingManager createPagingManager() {
|
||||
protected PagingManager createPagingManager() {
|
||||
|
||||
return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO), addressSettingsRepository);
|
||||
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository);
|
||||
}
|
||||
|
||||
protected PagingStoreFactoryNIO getPagingStoreFactory() {
|
||||
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -165,6 +166,10 @@ public class RefsOperation extends TransactionOperationAbstract {
|
|||
try {
|
||||
refmsg.getMessage().decrementRefCount();
|
||||
}
|
||||
catch (NonExistentPage e) {
|
||||
// This could happen on after commit, since the page could be deleted on file earlier by another thread
|
||||
logger.debug(e);
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -854,7 +854,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
boolean startedTransaction = false;
|
||||
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("individualACK messageID=" + messageID);
|
||||
}
|
||||
|
||||
if (tx == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("individualACK starting new TX");
|
||||
}
|
||||
startedTransaction = true;
|
||||
tx = new TransactionImpl(storageManager);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -29,6 +30,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -58,7 +60,11 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
|||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
|
||||
|
@ -70,14 +76,18 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class PagingTest extends ActiveMQTestBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PagingTest.class);
|
||||
|
||||
private ServerLocator locator;
|
||||
private ActiveMQServer server;
|
||||
private ClientSessionFactory sf;
|
||||
|
@ -2914,6 +2924,250 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
session.close();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRollbackOnSendThenSendMore() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig();
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
|
||||
|
||||
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
|
||||
|
||||
Queue queue = server.locateQueue(ADDRESS);
|
||||
|
||||
queue.getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
ClientMessage message;
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[100 * 4]);
|
||||
|
||||
message.putIntProperty(new SimpleString("id"), i);
|
||||
|
||||
producer.send(message);
|
||||
session.commit();
|
||||
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||
|
||||
}
|
||||
|
||||
for (int i = 20; i < 24; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[100 * 4]);
|
||||
|
||||
message.putIntProperty(new SimpleString("id"), i);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
session.rollback();
|
||||
|
||||
ClientSession consumerSession = sf.createSession(false, false);
|
||||
|
||||
|
||||
queue.getPageSubscription().getPagingStore().disableCleanup();
|
||||
|
||||
queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
|
||||
|
||||
consumerSession.start();
|
||||
ClientConsumer consumer = consumerSession.createConsumer(ADDRESS, SimpleString.toSimpleString("id > 0"));
|
||||
for (int i = 0; i < 19; i++) {
|
||||
ClientMessage messageRec = consumer.receive(5000);
|
||||
System.err.println("msg::" + messageRec);
|
||||
Assert.assertNotNull(messageRec);
|
||||
messageRec.acknowledge();
|
||||
consumerSession.commit();
|
||||
|
||||
// The only reason I'm calling cleanup directly is that it would be easy to debug in case of bugs
|
||||
// if you see an issue with cleanup here, enjoy debugging this method
|
||||
queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
|
||||
}
|
||||
queue.getPageSubscription().getPagingStore().enableCleanup();
|
||||
|
||||
consumerSession.close();
|
||||
|
||||
|
||||
session.close();
|
||||
sf.close();
|
||||
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
// The pages are complete, and this is simulating a scenario where the server crashed before deleting the pages.
|
||||
@Test
|
||||
public void testRestartWithComplete() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig();
|
||||
|
||||
final AtomicBoolean mainCleanup = new AtomicBoolean(true);
|
||||
|
||||
class InterruptedCursorProvider extends PageCursorProviderImpl {
|
||||
|
||||
public InterruptedCursorProvider(PagingStore pagingStore,
|
||||
StorageManager storageManager,
|
||||
Executor executor,
|
||||
int maxCacheSize) {
|
||||
super(pagingStore, storageManager, executor, maxCacheSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
if (mainCleanup.get()) {
|
||||
super.cleanup();
|
||||
}
|
||||
else {
|
||||
try {
|
||||
pagingStore.unlock();
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
|
||||
@Override
|
||||
protected PagingStoreFactoryNIO getPagingStoreFactory() {
|
||||
return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
|
||||
@Override
|
||||
public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
|
||||
return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
addServer(server);
|
||||
|
||||
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes( PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(true, true, 0);
|
||||
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
|
||||
|
||||
Queue queue = server.locateQueue(ADDRESS);
|
||||
|
||||
queue.getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
ClientMessage message;
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[100 * 4]);
|
||||
|
||||
message.putIntProperty(new SimpleString("idi"), i);
|
||||
|
||||
producer.send(message);
|
||||
session.commit();
|
||||
if (i < 19) {
|
||||
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Assert.assertEquals(20, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());
|
||||
|
||||
// This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete
|
||||
// if it is complete we must move to another page avoiding races on cleanup
|
||||
// which could happen during a crash / restart
|
||||
long tx = server.getStorageManager().generateID();
|
||||
for (int i = 1; i <= 20; i++) {
|
||||
server.getStorageManager().storePageCompleteTransactional(tx, queue.getID(), new PagePositionImpl(i, 1));
|
||||
}
|
||||
|
||||
server.getStorageManager().commit(tx);
|
||||
|
||||
session.close();
|
||||
sf.close();
|
||||
|
||||
server.stop();
|
||||
mainCleanup.set(false);
|
||||
|
||||
logger.trace("Server restart");
|
||||
|
||||
server.start();
|
||||
|
||||
queue = server.locateQueue(ADDRESS);
|
||||
|
||||
locator = createInVMNonHALocator();
|
||||
sf = createSessionFactory(locator);
|
||||
session = sf.createSession(null, null, false, false, true, false, 0);
|
||||
producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[100 * 4]);
|
||||
|
||||
message.putIntProperty(new SimpleString("newid"), i);
|
||||
|
||||
producer.send(message);
|
||||
session.commit();
|
||||
|
||||
if (i == 5) {
|
||||
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
mainCleanup.set(true);
|
||||
|
||||
queue = server.locateQueue(ADDRESS);
|
||||
queue.getPageSubscription().cleanupEntries(false);
|
||||
queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
|
||||
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
message = consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(i, message.getIntProperty("newid").intValue());
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
server.stop();
|
||||
|
||||
// Thread.sleep(5000);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitOnSend() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
|
|||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
|
||||
|
@ -105,7 +107,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
public void testDoubleStart() throws Exception {
|
||||
SequentialFileFactory factory = new FakeSequentialFileFactory();
|
||||
|
||||
PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
|
||||
PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
|
||||
|
||||
storeImpl.start();
|
||||
|
||||
|
@ -160,7 +162,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
storeImpl.sync();
|
||||
|
||||
storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
|
||||
storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
|
||||
|
||||
storeImpl.start();
|
||||
|
||||
|
@ -808,6 +810,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPagingManager(final PagingManager manager) {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue