From 7db0fe6a3859cadb4658318e7db262a34fe36b17 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 20 Dec 2013 17:29:31 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4819 --- .../apache/activemq/usage/SystemUsage.java | 9 +++ .../scheduler/JobSchedulerStoreImpl.java | 18 +---- .../activemq/usage/StoreUsageLimitsTest.java | 75 +++++++++++++++++++ 3 files changed, 87 insertions(+), 15 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java index b9c4c34620..ee4a0965fd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java @@ -41,6 +41,7 @@ public class SystemUsage implements Service { private TempUsage tempUsage; private ThreadPoolExecutor executor; private JobSchedulerUsage jobSchedulerUsage; + private String checkLimitsLogLevel = "warn"; /** * True if someone called setSendFailIfNoSpace() on this particular usage @@ -283,4 +284,12 @@ public class SystemUsage implements Service { this.jobSchedulerUsage.setExecutor(this.executor); } } + + public String getCheckLimitsLogLevel() { + return checkLimitsLogLevel; + } + + public void setCheckLimitsLogLevel(String checkLimitsLogLevel) { + this.checkLimitsLogLevel = checkLimitsLogLevel; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index f467303751..40a027db20 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -30,11 +30,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.broker.LockableServiceSupport; -import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.store.SharedFileLocker; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; @@ -48,10 +45,11 @@ import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.LockFile; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore { +public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore { static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -61,6 +59,7 @@ public class JobSchedulerStoreImpl extends LockableServiceSupport implements Job private File directory; PageFile pageFile; private Journal journal; + LockFile lockFile; protected AtomicLong journalSize = new AtomicLong(0); private boolean failIfDatabaseIsLocked; private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; @@ -406,15 +405,4 @@ public class JobSchedulerStoreImpl extends LockableServiceSupport implements Job public String toString() { return "JobSchedulerStore:" + this.directory; } - - @Override - public Locker createDefaultLocker() throws IOException { - SharedFileLocker locker = new SharedFileLocker(); - locker.configure(this); - return locker; - } - - @Override - public void init() throws Exception { - } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java new file mode 100644 index 0000000000..b184981dbd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usage; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; + +public class StoreUsageLimitsTest extends EmbeddedBrokerTestSupport { + + final int WAIT_TIME_MILLS = 20 * 1000; + private static final String limitsLogLevel = "error"; + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE); + broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel); + broker.deleteAllMessages(); + return broker; + } + + @Override + protected boolean isPersistent() { + return true; + } + + public void testCheckLimitsLogLevel() throws Exception { + + File file = new File("target/activemq-test.log"); + if (!file.exists()) { + fail("target/activemq-test.log was not created."); + } + + BufferedReader br = null; + boolean foundUsage = false; + + try { + br = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charset.forName("UTF-8"))); + String line = null; + while ((line = br.readLine()) != null) { + if (line.contains(new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)))) && line.contains(limitsLogLevel.toUpperCase())) { + foundUsage = true; + } + } + } catch (Exception e) { + fail(e.getMessage()); + } finally { + br.close(); + } + + if (!foundUsage) + fail("checkLimitsLogLevel message did not write to log target/activemq-test.log"); + } +}