Updated patch for new layout and fixed size returned to only include the dynamic journal size and not the total disk usage since that accounts for the full log file size and not just the used portion. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1421557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-12-13 21:43:50 +00:00
parent ebaacfd4f6
commit f21992ef41
11 changed files with 524 additions and 113 deletions

View File

@ -16,18 +16,70 @@
*/
package org.apache.activemq.broker;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.JobSchedulerView;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.jmx.StatusView;
import org.apache.activemq.broker.jmx.StatusViewMBean;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -51,23 +103,21 @@ import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.*;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@ -125,8 +175,7 @@ public class BrokerService implements Service {
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging
// systems
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private URI vmConnectorURI;
@ -173,6 +222,7 @@ public class BrokerService implements Service {
private BrokerContext brokerContext;
private boolean networkConnectorStartAsync = false;
private boolean allowTempAutoCreationOnSend;
private JobSchedulerStore jobSchedulerStore;
private int offlineDurableSubscriberTimeout = -1;
private int offlineDurableSubscriberTaskSchedule = 300000;
@ -332,6 +382,7 @@ public class BrokerService implements Service {
// Set a connection filter so that the connector does not establish loop
// back connections.
connector.setConnectionFilter(new ConnectionFilter() {
@Override
public boolean connectTo(URI location) {
List<TransportConnector> transportConnectors = getTransportConnectors();
for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
@ -463,6 +514,7 @@ public class BrokerService implements Service {
}
}
@Override
public void start() throws Exception {
if (stopped.get() || !started.compareAndSet(false, true)) {
// lets just ignore redundant start() calls
@ -617,6 +669,7 @@ public class BrokerService implements Service {
* @throws Exception
* @org.apache .xbean.DestroyMethod
*/
@Override
@PreDestroy
public void stop() throws Exception {
if (!stopping.compareAndSet(false, true)) {
@ -662,6 +715,10 @@ public class BrokerService implements Service {
broker = null;
}
if (jobSchedulerStore != null) {
jobSchedulerStore.stop();
jobSchedulerStore = null;
}
if (tempDataStore != null) {
tempDataStore.stop();
tempDataStore = null;
@ -961,11 +1018,13 @@ public class BrokerService implements Service {
public SystemUsage getSystemUsage() {
try {
if (systemUsage == null) {
systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
systemUsage.setExecutor(getExecutor());
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1000 * 50); // 50 // Gb
addService(this.systemUsage);
}
return systemUsage;
@ -1714,6 +1773,36 @@ public class BrokerService implements Service {
this.useTempMirroredQueues = useTempMirroredQueues;
}
public synchronized JobSchedulerStore getJobSchedulerStore() {
if (jobSchedulerStore == null && isSchedulerSupport()) {
try {
String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
configureService(jobSchedulerStore);
jobSchedulerStore.start();
LOG.info("JobScheduler using directory: " + getSchedulerDirectoryFile());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return jobSchedulerStore;
}
public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
this.jobSchedulerStore = jobSchedulerStore;
configureService(jobSchedulerStore);
try {
jobSchedulerStore.start();
} catch (Exception e) {
RuntimeException exception = new RuntimeException(
"Failed to start provided job scheduler store: " + jobSchedulerStore, e);
LOG.error(exception.getLocalizedMessage(), e);
throw exception;
}
}
//
// Implementation methods
// -------------------------------------------------------------------------
@ -1829,6 +1918,29 @@ public class BrokerService implements Service {
}
}
}
if (getJobSchedulerStore() != null) {
JobSchedulerStore scheduler = getJobSchedulerStore();
File schedulerDir = scheduler.getDirectory();
if (schedulerDir != null) {
String schedulerDirPath = schedulerDir.getAbsolutePath();
if (!schedulerDir.isAbsolute()) {
schedulerDir = new File(schedulerDirPath);
}
while (schedulerDir != null && schedulerDir.isDirectory() == false) {
schedulerDir = schedulerDir.getParentFile();
}
long schedularLimit = usage.getJobSchedulerUsage().getLimit();
long dirFreeSpace = schedulerDir.getUsableSpace();
if (schedularLimit > dirFreeSpace) {
LOG.warn("Job Schedular Store limit is " + schedularLimit / (1024 * 1024) +
" mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
" only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
}
}
}
}
public void stopAllConnectors(ServiceStopper stopper) {
@ -2056,7 +2168,7 @@ public class BrokerService implements Service {
*/
protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
@ -2283,6 +2395,7 @@ public class BrokerService implements Service {
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
int count=0;
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
thread.setDaemon(true);
@ -2301,6 +2414,7 @@ public class BrokerService implements Service {
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Async start of " + connector);

View File

@ -158,6 +158,14 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
public long getJobSchedulerStoreLimit() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
}
public int getJobSchedulerStorePercentUsage() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
}
public void setStoreLimit(long limit) {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
@ -166,6 +174,10 @@ public class BrokerView implements BrokerViewMBean {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
public void setJobSchedulerStoreLimit(long limit) {
brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
}
public void resetStatistics() {
safeGetBroker().getDestinationStatistics().reset();
}

View File

@ -107,11 +107,19 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Percent of temp limit used.")
int getTempPercentUsage();
@MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary date before producers are blocked.")
@MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary data before producers are blocked.")
long getTempLimit();
void setTempLimit(@MBeanInfo("bytes") long limit);
@MBeanInfo("Percent of job store limit used.")
int getJobSchedulerStorePercentUsage();
@MBeanInfo("Disk limit, in bytes, used for scheduled messages before producers are blocked.")
long getJobSchedulerStoreLimit();
void setJobSchedulerStoreLimit(@MBeanInfo("bytes") long limit);
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();

View File

@ -17,12 +17,14 @@
package org.apache.activemq.broker.scheduler;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
@ -33,13 +35,15 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.JobSchedulerUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.TypeConversionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.util.ByteSequence;
public class SchedulerBroker extends BrokerFilter implements JobListener {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
@ -50,38 +54,25 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final ConnectionContext context = new ConnectionContext();
private final ProducerId producerId = new ProducerId();
private File directory;
private final SystemUsage systemUsage;
private JobSchedulerStore store;
private final JobSchedulerStore store;
private JobScheduler scheduler;
public SchedulerBroker(Broker next, File directory) throws Exception {
public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
this.directory = directory;
this.store = store;
this.producerId.setConnectionId(ID_GENERATOR.generateId());
this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(next);
LOG.info("Scheduler using directory: " + directory);
this.context.setBroker(next);
this.systemUsage = brokerService.getSystemUsage();
}
public synchronized JobScheduler getJobScheduler() throws Exception {
return new JobSchedulerFacade(this);
}
/**
* @return the directory
*/
public File getDirectory() {
return this.directory;
}
/**
* @param directory
* the directory to set
*/
public void setDirectory(File directory) {
this.directory = directory;
}
@Override
public void start() throws Exception {
this.started.set(true);
@ -116,8 +107,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
String physicalName = messageSend.getDestination().getPhysicalName();
boolean schedularManage = physicalName.regionMatches(true, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
if (schedularManage == true) {
@ -182,17 +172,17 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
getInternalScheduler().schedule(msg.getMessageId().toString(),
new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay,
period, repeat);
} else {
super.send(producerExchange, messageSend);
}
}
@Override
public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
.getOffset(), job.getLength());
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
try {
Message messageSend = (Message) this.wireFormat.unmarshal(packet);
messageSend.setOriginalTransactionId(null);
@ -204,11 +194,36 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
// Check for room in the job scheduler store
if (systemUsage.getJobSchedulerUsage() != null) {
JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
if (usage.isFull()) {
final String logMessage = "Job Scheduler Store is Full (" +
usage.getPercentUsage() + "% of " + usage.getLimit() +
"). Stopping producer (" + messageSend.getProducerId() +
") to prevent flooding of the job scheduler store." +
" See http://activemq.apache.org/producer-flow-control.html for more info";
long start = System.currentTimeMillis();
long nextWarn = start;
while (!usage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + 30000l;
}
}
}
}
if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
// create a unique id - the original message could be sent
// lots of times
messageSend.setMessageId(
new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
}
// Add the jobId as a property
@ -257,7 +272,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
protected synchronized JobScheduler getInternalScheduler() throws Exception {
if (this.started.get()) {
if (this.scheduler == null) {
this.scheduler = getStore().getJobScheduler("JMS");
this.scheduler = store.getJobScheduler("JMS");
this.scheduler.addListener(this);
}
return this.scheduler;
@ -265,21 +280,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
return null;
}
private JobSchedulerStore getStore() throws Exception {
if (started.get()) {
if (this.store == null) {
String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
this.store.setDirectory(directory);
this.store.start();
}
return this.store;
}
return null;
}
protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
throws Exception {
protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
try {

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
* memory usage.
*
* @org.apache.xbean.XBean
*
*/
public class JobSchedulerUsage extends Usage<JobSchedulerUsage> {
private JobSchedulerStore store;
public JobSchedulerUsage() {
super(null, null, 1.0f);
}
public JobSchedulerUsage(String name, JobSchedulerStore store) {
super(null, name, 1.0f);
this.store = store;
}
public JobSchedulerUsage(JobSchedulerUsage parent, String name) {
super(parent, name, 1.0f);
this.store = parent.store;
}
@Override
protected long retrieveUsage() {
if (store == null) {
return 0;
}
return store.size();
}
public JobSchedulerStore getStore() {
return store;
}
public void setStore(JobSchedulerStore store) {
this.store = store;
onLimitChange();
}
}

View File

@ -19,7 +19,9 @@ package org.apache.activemq.usage;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -38,6 +40,7 @@ public class SystemUsage implements Service {
private StoreUsage storeUsage;
private TempUsage tempUsage;
private ThreadPoolExecutor executor;
private JobSchedulerUsage jobSchedulerUsage;
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
@ -51,15 +54,16 @@ public class SystemUsage implements Service {
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
this("default", null, null);
this("default", null, null, null);
}
public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore) {
public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore, JobSchedulerStore jobSchedulerStore) {
this.parent = null;
this.name = name;
this.memoryUsage = new MemoryUsage(name + ":memory");
this.storeUsage = new StoreUsage(name + ":store", adapter);
this.tempUsage = new TempUsage(name + ":temp", tempStore);
this.jobSchedulerUsage = new JobSchedulerUsage(name + ":jobScheduler", jobSchedulerStore);
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
@ -72,6 +76,7 @@ public class SystemUsage implements Service {
this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
this.jobSchedulerUsage = new JobSchedulerUsage(parent.jobSchedulerUsage, name + ":jobScheduler");
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
@ -102,11 +107,19 @@ public class SystemUsage implements Service {
return this.tempUsage;
}
/**
* @return the schedulerUsage
*/
public JobSchedulerUsage getJobSchedulerUsage() {
return this.jobSchedulerUsage;
}
@Override
public String toString() {
return "UsageManager(" + getName() + ")";
}
@Override
public void start() {
if (parent != null) {
parent.addChild(this);
@ -114,8 +127,10 @@ public class SystemUsage implements Service {
this.memoryUsage.start();
this.storeUsage.start();
this.tempUsage.start();
this.jobSchedulerUsage.start();
}
@Override
public void stop() {
if (parent != null) {
parent.removeChild(this);
@ -123,6 +138,7 @@ public class SystemUsage implements Service {
this.memoryUsage.stop();
this.storeUsage.stop();
this.tempUsage.stop();
this.jobSchedulerUsage.stop();
}
/**
@ -185,6 +201,7 @@ public class SystemUsage implements Service {
this.memoryUsage.setName(name + ":memory");
this.storeUsage.setName(name + ":store");
this.tempUsage.setName(name + ":temp");
this.jobSchedulerUsage.setName(name + ":jobScheduler");
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
@ -210,7 +227,6 @@ public class SystemUsage implements Service {
}
this.storeUsage = storeUsage;
this.storeUsage.setExecutor(executor);
}
public void setTempUsage(TempUsage tempDiskUsage) {
@ -227,6 +243,20 @@ public class SystemUsage implements Service {
this.tempUsage.setExecutor(getExecutor());
}
public void setJobSchedulerUsage(JobSchedulerUsage jobSchedulerUsage) {
if (jobSchedulerUsage.getStore() == null) {
jobSchedulerUsage.setStore(this.jobSchedulerUsage.getStore());
}
if (jobSchedulerUsage.getName() == null) {
jobSchedulerUsage.setName(this.jobSchedulerUsage.getName());
}
if (parent != null) {
jobSchedulerUsage.setParent(parent.jobSchedulerUsage);
}
this.jobSchedulerUsage = jobSchedulerUsage;
this.jobSchedulerUsage.setExecutor(getExecutor());
}
/**
* @return the executor
*/
@ -249,5 +279,8 @@ public class SystemUsage implements Service {
if (this.tempUsage != null) {
this.tempUsage.setExecutor(this.executor);
}
if(this.jobSchedulerUsage != null) {
this.jobSchedulerUsage.setExecutor(this.executor);
}
}
}

View File

@ -29,10 +29,14 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
@ -213,6 +217,64 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
producer.close();
}
public void testJobSchedulerStoreUsage() throws Exception {
// Shrink the store limit down so we get the producer to block
broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final long time = 5000;
final ProducerThread producer = new ProducerThread(sess, destination) {
@Override
protected Message createMessage(int i) throws Exception {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
return message;
}
};
producer.setMessageCount(100);
producer.start();
MessageConsumer consumer = sess.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(100);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
latch.countDown();
}
});
// wait for the producer to block, which should happen immediately, and also wait long
// enough for the delay to elapse. We should see no deliveries as the send should block
// on the first message.
Thread.sleep(10000l);
assertEquals(100, latch.getCount());
// Increase the store limit so the producer unblocks. Everything should enqueue at this point.
broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33);
// Wait long enough that the messages are enqueued and the delivery delay has elapsed.
Thread.sleep(10000l);
// Make sure we sent all the messages we expected to send
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, 20000l);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
// Make sure we got all the messages we expected to get
latch.await(20000l, TimeUnit.MILLISECONDS);
assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
}
@Override
protected void setUp() throws Exception {
bindAddress = "vm://localhost";

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import java.io.File;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
final int WAIT_TIME_MILLS = 20*1000;
@Override
protected BrokerService createBroker() throws Exception {
File schedulerDirectory = new File("target/scheduler");
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
BrokerService broker = super.createBroker();
broker.setSchedulerSupport(true);
broker.setSchedulerDirectoryFile(schedulerDirectory);
broker.getSystemUsage().getJobSchedulerUsage().setLimit(7 * 1024);
broker.deleteAllMessages();
return broker;
}
@Override
protected boolean isPersistent() {
return true;
}
public void testJmx() throws Exception {
LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = sess.createQueue(this.getClass().getName());
final ProducerThread producer = new ProducerThread(sess, dest) {
@Override
protected Message createMessage(int i) throws Exception {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, WAIT_TIME_MILLS / 2);
return message;
}
};
producer.setMessageCount(100);
producer.start();
assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
// wait for the producer to block
Thread.sleep(WAIT_TIME_MILLS / 2);
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
Thread.sleep(WAIT_TIME_MILLS);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, WAIT_TIME_MILLS * 2);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
}
}

View File

@ -89,7 +89,6 @@ import org.apache.activemq.util.Callback;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
@ -218,7 +217,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;

View File

@ -16,25 +16,6 @@
*/
package org.apache.activemq.store.kahadb.scheduler;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
@ -47,6 +28,26 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
@ -58,6 +59,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
private File directory;
PageFile pageFile;
private Journal journal;
protected AtomicLong journalSize = new AtomicLong(0);
private LockFile lockFile;
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@ -72,6 +74,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
protected MetaData(JobSchedulerStoreImpl store) {
this.store = store;
}
private final JobSchedulerStoreImpl store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
@ -111,7 +114,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.journalRC.getPageId());
}
}
@ -121,18 +123,22 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
MetaDataMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
@Override
public MetaData readPayload(DataInput dataIn) throws IOException {
MetaData rc = new MetaData(this.store);
rc.read(dataIn);
return rc;
}
@Override
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
object.write(dataOut);
}
}
class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
@Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
@ -144,6 +150,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
return result;
}
@Override
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
@ -154,15 +161,19 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
private final JobSchedulerStoreImpl store;
JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
@Override
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
JobSchedulerImpl result = new JobSchedulerImpl(this.store);
result.read(dataIn);
return result;
}
@Override
public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
js.write(dataOut);
}
@ -184,7 +195,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
return 0;
}
try {
return journal.getDiskSize() + pageFile.getDiskSize();
return journalSize.get() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -197,6 +208,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
final JobSchedulerImpl js = new JobSchedulerImpl(this);
js.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
js.createIndexes(tx);
js.load(tx);
@ -221,6 +233,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
if (result) {
js.stop();
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
metaData.storedSchedulers.remove(tx, name);
js.destroy(tx);
@ -241,12 +254,14 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.setSizeAccumulator(this.journalSize);
this.journal.start();
this.pageFile = new PageFile(directory, "scheduleDB");
this.pageFile.setWriteBatchSize(1);
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
Page<MetaData> page = tx.allocate();
@ -293,7 +308,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
}
this.lockFile = null;
LOG.info(this + " stopped");
}
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
@ -301,7 +315,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
Integer val = this.metaData.journalRC.get(tx, logId);
int refCount = val != null ? val.intValue() + 1 : 1;
this.metaData.journalRC.put(tx, logId, refCount);
}
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
@ -316,7 +329,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
} else {
this.metaData.journalRC.put(tx, logId, refCount);
}
}
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
@ -341,8 +353,7 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
lockFile.lock();
break;
} catch (IOException e) {
LOG.info("Database " + lockFileName + " is locked... waiting "
+ (DATABASE_LOCKED_WAIT_DELAY / 1000)
LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the database to be unlocked. Reason: " + e);
try {
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
@ -395,5 +406,4 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
}

View File

@ -74,6 +74,9 @@ public class XBeanBrokerService extends BrokerService {
if (usage.getTempUsage().getStore() == null) {
usage.getTempUsage().setStore(getTempDataStore());
}
if (usage.getJobSchedulerUsage().getStore() == null) {
usage.getJobSchedulerUsage().setStore(getJobSchedulerStore());
}
}
/**