This commit is contained in:
Justin Bertram 2019-04-01 09:47:42 -05:00
commit 8ca4b66e1d
3 changed files with 12 additions and 14 deletions

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.paging.impl; package org.apache.activemq.artemis.core.paging.impl;
import java.nio.file.FileStore; import java.nio.file.FileStore;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
@ -180,13 +179,7 @@ public final class PagingManagerImpl implements PagingManager {
memoryReleased(); memoryReleased();
} }
} }
Iterator<PagingStore> storeIterator = blockedStored.iterator(); blockedStored.removeIf(PagingStore::checkReleasedMemory);
while (storeIterator.hasNext()) {
PagingStore store = storeIterator.next();
if (store.checkReleasedMemory()) {
storeIterator.remove();
}
}
} }
} }
@ -213,9 +206,12 @@ public final class PagingManagerImpl implements PagingManager {
@Override @Override
public void under(FileStore store, double usage) { public void under(FileStore store, double usage) {
final boolean diskFull = PagingManagerImpl.this.diskFull;
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) { if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
if (diskFull) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored(); ActiveMQServerLogger.LOGGER.diskCapacityRestored();
diskFull = false; PagingManagerImpl.this.diskFull = false;
}
checkMemoryRelease(); checkMemoryRelease();
} }
} }

View File

@ -1337,7 +1337,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void diskBeyondCapacity(); void diskBeyondCapacity();
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.INFO)
@Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.", @Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void diskCapacityRestored(); void diskCapacityRestored();

View File

@ -58,6 +58,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
server.getAddressSettingsRepository().addMatch("#", defaultSetting); server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.getConfiguration().setDiskScanPeriod(100);
server.start(); server.start();
internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server); internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
@ -73,7 +74,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
AddressSettings defaultSetting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); AddressSettings defaultSetting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
server.getAddressSettingsRepository().addMatch("#", defaultSetting); server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.getConfiguration().setGlobalMaxSize(20 * 1024); server.getConfiguration().setGlobalMaxSize(20 * 1024).setDiskScanPeriod(100);
server.start(); server.start();
@ -122,7 +123,6 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
executor.shutdown(); executor.shutdown();
session.close(); session.close();
session = factory.createSession(false, true, true); session = factory.createSession(false, true, true);
session.start(); session.start();
ClientConsumer consumer = session.createConsumer(MY_QUEUE); ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -132,7 +132,8 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
break; break;
msg.acknowledge(); msg.acknowledge();
} }
//this is needed to allow to kick-in at least once disk scan
TimeUnit.MILLISECONDS.sleep(server.getConfiguration().getDiskScanPeriod() * 2);
session.close(); session.close();
locator.close(); locator.close();
server.stop(); server.stop();
@ -140,6 +141,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
// Using the code only so the test doesn't fail just because someone edits the log text // Using the code only so the test doesn't fail just because someone edits the log text
Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress")); Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress")); Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
Assert.assertFalse("Expected to not find AMQ222211", AssertionLoggerHandler.findText("AMQ222211"));
} }
@AfterClass @AfterClass