Timothy Bish 2013-12-20 17:29:31 -05:00
parent 44b25cb7a1
commit 7db0fe6a38
3 changed files with 87 additions and 15 deletions

View File

@ -41,6 +41,7 @@ public class SystemUsage implements Service {
private TempUsage tempUsage;
private ThreadPoolExecutor executor;
private JobSchedulerUsage jobSchedulerUsage;
private String checkLimitsLogLevel = "warn";
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
@ -283,4 +284,12 @@ public class SystemUsage implements Service {
this.jobSchedulerUsage.setExecutor(this.executor);
}
}
public String getCheckLimitsLogLevel() {
return checkLimitsLogLevel;
}
public void setCheckLimitsLogLevel(String checkLimitsLogLevel) {
this.checkLimitsLogLevel = checkLimitsLogLevel;
}
}

View File

@ -30,11 +30,8 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
@ -48,10 +45,11 @@ import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -61,6 +59,7 @@ public class JobSchedulerStoreImpl extends LockableServiceSupport implements Job
private File directory;
PageFile pageFile;
private Journal journal;
LockFile lockFile;
protected AtomicLong journalSize = new AtomicLong(0);
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@ -406,15 +405,4 @@ public class JobSchedulerStoreImpl extends LockableServiceSupport implements Job
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
@Override
public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
locker.configure(this);
return locker;
}
@Override
public void init() throws Exception {
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
public class StoreUsageLimitsTest extends EmbeddedBrokerTestSupport {
final int WAIT_TIME_MILLS = 20 * 1000;
private static final String limitsLogLevel = "error";
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE);
broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel);
broker.deleteAllMessages();
return broker;
}
@Override
protected boolean isPersistent() {
return true;
}
public void testCheckLimitsLogLevel() throws Exception {
File file = new File("target/activemq-test.log");
if (!file.exists()) {
fail("target/activemq-test.log was not created.");
}
BufferedReader br = null;
boolean foundUsage = false;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charset.forName("UTF-8")));
String line = null;
while ((line = br.readLine()) != null) {
if (line.contains(new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)))) && line.contains(limitsLogLevel.toUpperCase())) {
foundUsage = true;
}
}
} catch (Exception e) {
fail(e.getMessage());
} finally {
br.close();
}
if (!foundUsage)
fail("checkLimitsLogLevel message did not write to log target/activemq-test.log");
}
}