This closes #856
This commit is contained in:
commit
8d4f507cb8
|
@ -410,10 +410,16 @@ public interface QueueControl {
|
|||
@Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
|
||||
void pause() throws Exception;
|
||||
|
||||
/**
|
||||
* Pauses the queue. Messages are no longer delivered to its consumers.
|
||||
*/
|
||||
@Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
|
||||
void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
|
||||
|
||||
/**
|
||||
* Resumes the queue. Messages are again delivered to its consumers.
|
||||
*/
|
||||
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION)
|
||||
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION)
|
||||
void resume() throws Exception;
|
||||
|
||||
@Operation(desc = "List all the existent consumers on the Queue")
|
||||
|
|
|
@ -369,6 +369,12 @@ public interface JMSQueueControl extends DestinationControl {
|
|||
@Operation(desc = "Pause the queue.", impact = MBeanOperationInfo.ACTION)
|
||||
void pause() throws Exception;
|
||||
|
||||
/**
|
||||
* Pauses the queue. Messages are no longer delivered to its consumers.
|
||||
*/
|
||||
@Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
|
||||
void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
|
||||
|
||||
/**
|
||||
* Returns whether the queue is paused.
|
||||
*/
|
||||
|
|
|
@ -473,6 +473,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
coreQueueControl.pause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) throws Exception {
|
||||
coreQueueControl.pause(persist);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() throws Exception {
|
||||
coreQueueControl.resume();
|
||||
|
|
|
@ -848,6 +848,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
queue.pause(persist);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void resume() {
|
||||
checkStarted();
|
||||
|
|
|
@ -16,7 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.persistence;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
|
||||
|
||||
public interface QueueBindingInfo {
|
||||
|
||||
|
@ -39,4 +42,8 @@ public interface QueueBindingInfo {
|
|||
|
||||
SimpleString getUser();
|
||||
|
||||
void addQueueStatusEncoding(QueueStatusEncoding status);
|
||||
|
||||
List<QueueStatusEncoding> getQueueStatusEncodings();
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.persistence;
|
||||
|
||||
public enum QueueStatus {
|
||||
PAUSED((short) 0), RUNNING((short) 1);
|
||||
|
||||
public final short id;
|
||||
|
||||
QueueStatus(short id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public static QueueStatus[] values;
|
||||
|
||||
static {
|
||||
QueueStatus[] allValues = QueueStatus.values();
|
||||
values = new QueueStatus[allValues.length];
|
||||
for (QueueStatus v : allValues) {
|
||||
values[v.id] = v;
|
||||
}
|
||||
}
|
||||
|
||||
public static QueueStatus fromID(short id) {
|
||||
if (id < 0 || id > values.length) {
|
||||
return null;
|
||||
} else {
|
||||
return values[id];
|
||||
}
|
||||
}
|
||||
}
|
|
@ -287,6 +287,17 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
|
||||
void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param queueID The id of the queue
|
||||
* @param status The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)
|
||||
* @return the id of the journal
|
||||
* @throws Exception
|
||||
*/
|
||||
long storeQueueStatus(long queueID, QueueStatus status) throws Exception;
|
||||
|
||||
void deleteQueueStatus(long recordID) throws Exception;
|
||||
|
||||
JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
|
||||
List<GroupingInfo> groupingInfos) throws Exception;
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -81,6 +82,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding;
|
||||
|
@ -105,6 +107,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
|||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
|
||||
|
@ -122,6 +125,8 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
|
|||
*/
|
||||
public abstract class AbstractJournalStorageManager implements StorageManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
|
||||
|
||||
public enum JournalContent {
|
||||
BINDINGS((byte) 0), MESSAGES((byte) 1);
|
||||
|
||||
|
@ -1236,6 +1241,32 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
long recordID = idGenerator.generateID();
|
||||
|
||||
readLock();
|
||||
try {
|
||||
bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true);
|
||||
} finally {
|
||||
readUnLock();
|
||||
}
|
||||
|
||||
|
||||
return recordID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteQueueStatus(long recordID) throws Exception {
|
||||
readLock();
|
||||
try {
|
||||
bindingsJournal.appendDeleteRecord(recordID, true);
|
||||
} finally {
|
||||
readUnLock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storePageCounterInc(long txID, long queueID, int value) throws Exception {
|
||||
readLock();
|
||||
|
@ -1326,6 +1357,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
|
||||
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
|
||||
|
||||
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
|
||||
|
||||
for (RecordInfo record : records) {
|
||||
long id = record.id;
|
||||
|
||||
|
@ -1337,6 +1370,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer);
|
||||
|
||||
queueBindingInfos.add(bindingEncoding);
|
||||
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
|
||||
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
|
||||
idGenerator.loadState(record.id, buffer);
|
||||
} else if (rec == JournalRecordIds.GROUP_RECORD) {
|
||||
|
@ -1348,11 +1382,24 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
} else if (rec == JournalRecordIds.SECURITY_RECORD) {
|
||||
PersistedRoles roles = newSecurityRecord(id, buffer);
|
||||
mapPersistedRoles.put(roles.getAddressMatch(), roles);
|
||||
} else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) {
|
||||
QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer);
|
||||
PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID);
|
||||
if (queueBindingEncoding != null) {
|
||||
queueBindingEncoding.addQueueStatusEncoding(statusEncoding);
|
||||
} else {
|
||||
// unlikely to happen, so I didn't bother about the Logger method
|
||||
logger.info("There is no queue with ID " + statusEncoding.queueID + ", deleting record " + statusEncoding.getId());
|
||||
this.deleteQueueStatus(statusEncoding.getId());
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("Invalid record type " + rec);
|
||||
// unlikely to happen
|
||||
logger.warn("Invalid record type " + rec, new Exception("invalid record type " + rec));
|
||||
}
|
||||
}
|
||||
|
||||
mapBindings.clear(); // just to give a hand to GC
|
||||
|
||||
// This will instruct the IDGenerator to beforeStop old records
|
||||
idGenerator.cleanup();
|
||||
|
||||
|
@ -1821,6 +1868,23 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
return bindingEncoding;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param id
|
||||
* @param buffer
|
||||
* @return
|
||||
*/
|
||||
protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer) {
|
||||
QueueStatusEncoding statusEncoding = new QueueStatusEncoding();
|
||||
|
||||
statusEncoding.decode(buffer);
|
||||
statusEncoding.setId(id);
|
||||
|
||||
return statusEncoding;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean addToPage(PagingStore store,
|
||||
ServerMessage msg,
|
||||
|
|
|
@ -74,6 +74,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
|
|||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
|
||||
|
@ -550,6 +551,9 @@ public final class DescribeJournal {
|
|||
return encoding;
|
||||
}
|
||||
|
||||
case QUEUE_STATUS_RECORD:
|
||||
return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer);
|
||||
|
||||
case QUEUE_BINDING_RECORD:
|
||||
return AbstractJournalStorageManager.newBindingEncoding(id, buffer);
|
||||
|
||||
|
|
|
@ -33,6 +33,8 @@ public final class JournalRecordIds {
|
|||
|
||||
public static final byte QUEUE_BINDING_RECORD = 21;
|
||||
|
||||
public static final byte QUEUE_STATUS_RECORD = 22;
|
||||
|
||||
/**
|
||||
* Records storing the current recordID number.
|
||||
*
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
|
@ -36,6 +39,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
|
||||
public SimpleString user;
|
||||
|
||||
public List<QueueStatusEncoding> queueStatusEncodings;
|
||||
|
||||
public PersistentQueueBindingEncoding() {
|
||||
}
|
||||
|
||||
|
@ -106,6 +111,19 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
return autoCreated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addQueueStatusEncoding(QueueStatusEncoding status) {
|
||||
if (queueStatusEncodings == null) {
|
||||
queueStatusEncodings = new LinkedList<>();
|
||||
}
|
||||
queueStatusEncodings.add(status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<QueueStatusEncoding> getQueueStatusEncodings() {
|
||||
return queueStatusEncodings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(final ActiveMQBuffer buffer) {
|
||||
name = buffer.readSimpleString();
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class QueueEncoding implements EncodingSupport {
|
||||
|
||||
|
@ -44,7 +45,7 @@ public class QueueEncoding implements EncodingSupport {
|
|||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return 8;
|
||||
return DataConstants.SIZE_LONG;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class QueueStatusEncoding extends QueueEncoding {
|
||||
|
||||
private QueueStatus status;
|
||||
|
||||
private long id;
|
||||
|
||||
public QueueStatusEncoding(long queueID, QueueStatus status) {
|
||||
super(queueID);
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public QueueStatusEncoding() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(final ActiveMQBuffer buffer) {
|
||||
super.decode(buffer);
|
||||
short shortStatus = buffer.readShort();
|
||||
this.status = QueueStatus.fromID(shortStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(final ActiveMQBuffer buffer) {
|
||||
super.encode(buffer);
|
||||
buffer.writeShort(status.id);
|
||||
}
|
||||
|
||||
public QueueStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public QueueStatusEncoding setId(long id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return super.getEncodeSize() + DataConstants.SIZE_SHORT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "QueueStatusEncoding [id=" + id + ", queueID=" + queueID + ", status=" + status + "]";
|
||||
}
|
||||
|
||||
}
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -84,6 +85,16 @@ public class NullStorageManager implements StorageManager {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteQueueStatus(long recordID) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
|
||||
|
||||
|
|
|
@ -206,6 +206,15 @@ public interface Queue extends Bindable {
|
|||
*/
|
||||
void pause();
|
||||
|
||||
/**
|
||||
* Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
|
||||
* If a queue is paused, pausing it again will only throw a warning.
|
||||
* To check if a queue is paused, invoke <i>isPaused()</i>
|
||||
*/
|
||||
void pause(boolean persist);
|
||||
|
||||
void reloadPause(long recordID);
|
||||
|
||||
/**
|
||||
* Resumes the delivery of message for the queue.
|
||||
* If a queue is resumed, resuming it again will only throw a warning.
|
||||
|
@ -218,6 +227,12 @@ public interface Queue extends Bindable {
|
|||
*/
|
||||
boolean isPaused();
|
||||
|
||||
/**
|
||||
* if the pause was persisted
|
||||
* @return
|
||||
*/
|
||||
boolean isPersistedPause();
|
||||
|
||||
Executor getExecutor();
|
||||
|
||||
void resetAllIterators();
|
||||
|
|
|
@ -39,9 +39,11 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
|
|||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
|
@ -146,6 +148,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
|
||||
}
|
||||
|
||||
if (queueBindingInfo.getQueueStatusEncodings() != null) {
|
||||
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
|
||||
if (encoding.getStatus() == QueueStatus.PAUSED)
|
||||
queue.reloadPause(encoding.getId());
|
||||
}
|
||||
}
|
||||
|
||||
final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
|
||||
queues.put(queue.getID(), queue);
|
||||
postOffice.addBinding(binding);
|
||||
|
@ -245,7 +254,9 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
ResourceManager resourceManager,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
|
||||
for (Queue queue : queues.values()) {
|
||||
queue.resume();
|
||||
if (!queue.isPersistedPause()) {
|
||||
queue.resume();
|
||||
}
|
||||
}
|
||||
|
||||
if (System.getProperty("org.apache.activemq.opt.directblast") != null) {
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
|||
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -176,6 +177,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
private boolean paused;
|
||||
|
||||
private long pauseStatusRecord = -1;
|
||||
|
||||
private static final int MAX_SCHEDULED_RUNNERS = 2;
|
||||
|
||||
// We don't ever need more than two DeliverRunner on the executor's list
|
||||
|
@ -1718,8 +1721,32 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public synchronized void pause() {
|
||||
pause(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reloadPause(long recordID) {
|
||||
this.paused = true;
|
||||
if (pauseStatusRecord >= 0) {
|
||||
try {
|
||||
storageManager.deleteQueueStatus(pauseStatusRecord);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
this.pauseStatusRecord = recordID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void pause(boolean persist) {
|
||||
try {
|
||||
this.flushDeliveriesInTransit();
|
||||
if (persist && isDurable()) {
|
||||
if (pauseStatusRecord >= 0) {
|
||||
storageManager.deleteQueueStatus(pauseStatusRecord);
|
||||
}
|
||||
pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
|
@ -1730,6 +1757,15 @@ public class QueueImpl implements Queue {
|
|||
public synchronized void resume() {
|
||||
paused = false;
|
||||
|
||||
if (pauseStatusRecord >= 0) {
|
||||
try {
|
||||
storageManager.deleteQueueStatus(pauseStatusRecord);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
pauseStatusRecord = -1;
|
||||
}
|
||||
|
||||
deliverAsync();
|
||||
}
|
||||
|
||||
|
@ -1738,6 +1774,11 @@ public class QueueImpl implements Queue {
|
|||
return paused;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isPersistedPause() {
|
||||
return this.pauseStatusRecord >= 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirectDeliver() {
|
||||
return directDeliver;
|
||||
|
|
|
@ -845,6 +845,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
this.expectedElements = new CountDownLatch(expectedElements);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPersistedPause() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
|
||||
return expectedElements.await(timeout, timeUnit);
|
||||
}
|
||||
|
@ -862,6 +867,14 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reloadPause(long recordID) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter getFilter() {
|
||||
return null;
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -242,6 +243,16 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteQueueStatus(long recordID) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pageWrite(PagedMessage message, int pageNumber) {
|
||||
|
||||
|
|
|
@ -350,6 +350,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
|
|||
proxy.invokeOperation("pause");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) throws Exception {
|
||||
proxy.invokeOperation("pause", persist);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() throws Exception {
|
||||
proxy.invokeOperation("resume");
|
||||
|
|
|
@ -336,6 +336,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
proxy.invokeOperation("pause");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) throws Exception {
|
||||
proxy.invokeOperation("pause", persist);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() throws Exception {
|
||||
proxy.invokeOperation("resume");
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class QueuePeristPauseTest extends ActiveMQTestBase {
|
||||
|
||||
@Test
|
||||
public void testPauseQueue() throws Exception {
|
||||
ActiveMQServer server = createServer(true, false);
|
||||
server.start();
|
||||
|
||||
Queue queue = server.createQueue(SimpleString.toSimpleString("q1"),
|
||||
SimpleString.toSimpleString("q1"),
|
||||
null, true, false);
|
||||
|
||||
queue.pause(true);
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
server.stop();
|
||||
server.start();
|
||||
queue = server.locateQueue(SimpleString.toSimpleString("q1"));
|
||||
Assert.assertTrue(queue.isPaused());
|
||||
}
|
||||
|
||||
queue.resume();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
server.stop();
|
||||
server.start();
|
||||
queue = server.locateQueue(SimpleString.toSimpleString("q1"));
|
||||
Assert.assertFalse(queue.isPaused());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -56,6 +56,16 @@ public class FakeQueue implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reloadPause(long recordID) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPersistedPause() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int retryMessages(Filter filter) throws Exception {
|
||||
return 0;
|
||||
|
@ -102,6 +112,11 @@ public class FakeQueue implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flushExecutor() {
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue