This commit is contained in:
Martyn Taylor 2018-11-05 09:43:51 +00:00
commit 43ad18058d
18 changed files with 203 additions and 41 deletions

View File

@ -241,11 +241,11 @@ public class DBOption extends OptionalLocking {
storageManager, 1000L,
scheduledExecutorService, executorFactory,
false, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
} else {
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
}

View File

@ -486,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback {
try {
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
if (store.isRejectingMessages()) {
if (store != null && store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx)
if (delivery.remotelySettled()) {
if (transaction != null) {
@ -585,7 +585,11 @@ public class AMQPSessionCallback implements SessionCallback {
pagingManager.checkMemory(runnable);
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
if (store != null) {
store.checkMemory(runnable);
} else {
runnable.run();
}
}
} catch (Exception e) {
throw new RuntimeException(e);

View File

@ -418,10 +418,14 @@ public class AMQSession implements SessionCallback {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
if (store != null) {
if (!store.checkMemory(enableAutoReadAndTtl)) {
enableAutoReadAndTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
} else {
enableAutoReadAndTtl.run();
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
@ -443,7 +447,7 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
if (!store.checkMemory(false, () -> {
final Runnable task = () -> {
Exception exceptionToSend = null;
try {
@ -496,11 +500,16 @@ public class AMQSession implements SessionCallback {
});
}
}
})) {
};
if (store != null) {
if (!store.checkMemory(false, task)) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
} else {
task.run();
}
}
private void enableAutoReadAndTtl() {

View File

@ -201,17 +201,29 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public long getNumberOfBytesPerPage() throws Exception {
clearIO();
try {
return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes();
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getPageSizeBytes();
} finally {
blockOnIO();
}
}
private PagingStore getPagingStore() throws Exception {
return pagingManager.getPageStore(addressInfo.getName());
}
@Override
public long getAddressSize() throws Exception {
clearIO();
try {
return pagingManager.getPageStore(addressInfo.getName()).getAddressSize();
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getAddressSize();
} finally {
blockOnIO();
}
@ -240,7 +252,11 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public boolean isPaging() throws Exception {
clearIO();
try {
return pagingManager.getPageStore(addressInfo.getName()).isPaging();
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return false;
}
return pagingStore.isPaging();
} finally {
blockOnIO();
}
@ -250,12 +266,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public int getNumberOfPages() throws Exception {
clearIO();
try {
PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName());
final PagingStore pageStore = getPagingStore();
if (!pageStore.isPaging()) {
if (pageStore == null || !pageStore.isPaging()) {
return 0;
} else {
return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages();
return pageStore.getNumberOfPages();
}
} finally {
blockOnIO();

View File

@ -86,6 +86,8 @@ public final class PagingManagerImpl implements PagingManager {
private ActiveMQScheduledComponent scheduledComponent = null;
private final SimpleString managementAddress;
// Static
// --------------------------------------------------------------------------------------------------------------------------
@ -105,17 +107,25 @@ public final class PagingManagerImpl implements PagingManager {
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final long maxSize) {
final long maxSize,
final SimpleString managementAddress) {
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
this.memoryExecutor = pagingSPI.newExecutor();
this.managementAddress = managementAddress;
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1);
this(pagingSPI, addressSettingsRepository, -1, null);
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final SimpleString managementAddress) {
this(pagingSPI, addressSettingsRepository, -1, managementAddress);
}
@Override
@ -329,6 +339,9 @@ public final class PagingManagerImpl implements PagingManager {
*/
@Override
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
if (managementAddress != null && storeName.startsWith(managementAddress)) {
return null;
}
PagingStore store = stores.get(storeName);
if (store != null) {
@ -438,6 +451,7 @@ public final class PagingManagerImpl implements PagingManager {
}
private PagingStore newStore(final SimpleString address) throws Exception {
assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress));
syncLock.readLock().lock();
try {
PagingStore store = stores.get(address);

View File

@ -1244,6 +1244,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (queueInfo != null) {
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
if (store == null) {
return null;
}
subs = store.getCursorProvider().getSubscription(queueID);
pageSubscriptions.put(queueID, subs);
}

View File

@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -62,13 +61,10 @@ public final class BindingsImpl implements Bindings {
private final GroupingHandler groupingHandler;
private final PagingStore pageStore;
private final SimpleString name;
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore) {
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
this.groupingHandler = groupingHandler;
this.pageStore = pageStore;
this.name = name;
}

View File

@ -1252,7 +1252,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());
if (storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
}
@ -1696,9 +1696,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public Bindings createBindings(final SimpleString address) throws Exception {
public Bindings createBindings(final SimpleString address) {
GroupingHandler groupingHandler = server.getGroupingHandler();
BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address));
BindingsImpl bindings = new BindingsImpl(address, groupingHandler);
if (groupingHandler != null) {
groupingHandler.addListener(bindings);
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
public final class QueueConfig {
@ -202,7 +203,12 @@ public final class QueueConfig {
final PageSubscription pageSubscription;
if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) {
try {
pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable);
final PagingStore pageStore = this.pagingManager.getPageStore(address);
if (pageStore != null) {
pageSubscription = pageStore.getCursorProvider().createSubscription(id, filter, durable);
} else {
pageSubscription = null;
}
} catch (Exception e) {
throw new IllegalStateException(e);
}

View File

@ -2384,7 +2384,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public PagingManager createPagingManager() throws Exception {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress());
}
protected PagingStoreFactory getPagingStoreFactory() throws Exception {

View File

@ -363,7 +363,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
// This can't be true!
assert (perQueue != null);
if (store.checkPageFileExists(pageId.intValue())) {
if (store != null && store.checkPageFileExists(pageId.intValue())) {
// on this case we need to recalculate the records
Page pg = store.createPage(pageId.intValue());
pg.open();

View File

@ -154,7 +154,9 @@ public class ScaleDownHandler {
Transaction tx = new TransactionImpl(storageManager);
if (pageStore != null) {
pageStore.disableCleanup();
}
try {
@ -240,10 +242,12 @@ public class ScaleDownHandler {
return messageCount;
} finally {
if (pageStore != null) {
pageStore.enableCleanup();
pageStore.getCursorProvider().scheduleCleanup();
}
}
}
private long scaleDownSNF(final SimpleString address,
final Set<Queue> queues,
@ -556,6 +560,9 @@ public class ScaleDownHandler {
public boolean lookup(MessageReference reference) throws Exception {
if (reference.isPaged()) {
if (store == null) {
return false;
}
PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID());
if (subscription.contains((PagedReference) reference)) {
return true;

View File

@ -1536,7 +1536,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString addr = removePrefix(address);
PagingStore store = server.getPagingManager().getPageStore(addr);
if (!store.checkMemory(new Runnable() {
if (store == null) {
callback.sendProducerCreditsMessage(credits, address);
} else if (!store.checkMemory(new Runnable() {
@Override
public void run() {
callback.sendProducerCreditsMessage(credits, address);

View File

@ -28,6 +28,7 @@ import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
@ -304,7 +305,7 @@ public class FileMoveManagerTest {
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
managerImpl.start();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.broker.ConnectionContext;
@ -153,7 +154,11 @@ public class DestinationProxy implements Destination {
@Override
public long getUsage() {
try {
return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize();
final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
if (pageStore == null) {
return 0;
}
return pageStore.getAddressSize();
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -221,9 +226,13 @@ public class DestinationProxy implements Destination {
@Override
public int getPercentUsage() {
long total = 0;
final long total;
try {
total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize();
final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
if (pageStore == null) {
return 0;
}
total = pageStore.getMaxSize();
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -19,15 +19,23 @@ package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
@ -192,4 +200,91 @@ public class GlobalPagingTest extends PagingTest {
session.commit();
}
@Test
public void testManagementAddressCannotPageOrChangeGlobalSize() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1);
try {
final SimpleString managementAddress = server.getConfiguration().getManagementAddress();
server.getConfiguration().setGlobalMaxSize(1);
server.start();
final ServerLocator locator = createInVMNonHALocator()
.setBlockOnNonDurableSend(true)
.setBlockOnDurableSend(true)
.setBlockOnAcknowledge(true);
try (ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true)) {
session.start();
if (server.locateQueue(managementAddress) == null) {
session.createQueue(managementAddress, managementAddress, null, true);
}
final Queue managementQueue = server.locateQueue(managementAddress);
Assert.assertNull(managementQueue.getPageSubscription());
Assert.assertNull(server.getPagingManager().getPageStore(managementAddress));
final SimpleString address = SimpleString.toSimpleString("queue");
if (server.locateQueue(address) == null) {
session.createQueue(address, address, null, true);
}
final CountDownLatch startSendMessages = new CountDownLatch(1);
final PagingManager pagingManager = server.getPagingManager();
final long globalSize = pagingManager.getGlobalSize();
final Thread globalSizeChecker = new Thread(() -> {
startSendMessages.countDown();
while (!Thread.currentThread().isInterrupted()) {
Assert.assertEquals(globalSize, pagingManager.getGlobalSize());
}
});
globalSizeChecker.start();
try (ClientRequestor requestor = new ClientRequestor(session, managementAddress)) {
ClientMessage message = session.createMessage(false);
ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount");
Assert.assertTrue("bodySize = " + message.getBodySize() + " must be > of globalMaxSize = " + server.getConfiguration().getGlobalMaxSize(), message.getBodySize() > server.getConfiguration().getGlobalMaxSize());
startSendMessages.await();
for (int i = 0; i < 100; i++) {
try {
ClientMessage reply = requestor.request(message);
Assert.assertEquals(0L, ManagementHelper.getResult(reply));
} catch (ActiveMQAddressFullException e) {
Assert.fail(e.getMessage());
return;
}
}
} finally {
globalSizeChecker.interrupt();
}
}
} finally {
server.stop(true);
}
}
}

View File

@ -598,7 +598,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
final ExecutorFactory executorFactory,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository);
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
paging.start();
return paging;

View File

@ -70,7 +70,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
final Bindings bind = new BindingsImpl(null, null, null);
final Bindings bind = new BindingsImpl(null, null);
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));