diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 8ad56a7912..4a27fd07ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -111,7 +111,9 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransportFactorySupport; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.vm.VMTransportFactory; +import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.Usage; import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.DefaultIOExceptionHandler; import org.apache.activemq.util.IOExceptionHandler; @@ -119,6 +121,7 @@ import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.StoreUtil; import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.util.TimeUtils; import org.apache.activemq.util.URISupport; @@ -230,6 +233,7 @@ public class BrokerService implements Service { private int schedulePeriodForDestinationPurge= 0; private int maxPurgedDestinationsPerSweep = 0; private int schedulePeriodForDiskUsageCheck = 0; + private int diskUsageCheckRegrowThreshold = -1; private BrokerContext brokerContext; private boolean networkConnectorStartAsync = false; private boolean allowTempAutoCreationOnSend; @@ -1965,30 +1969,7 @@ public class BrokerService implements Service { if (getPersistenceAdapter() != null) { PersistenceAdapter adapter = getPersistenceAdapter(); - File dir = adapter.getDirectory(); - - if (dir != null) { - String dirPath = dir.getAbsolutePath(); - if (!dir.isAbsolute()) { - dir = new File(dirPath); - } - - while (dir != null && !dir.isDirectory()) { - dir = dir.getParentFile(); - } - long storeLimit = usage.getStoreUsage().getLimit(); - long storeCurrent = usage.getStoreUsage().getUsage(); - long dirFreeSpace = dir.getUsableSpace(); - if (storeLimit > (dirFreeSpace + storeCurrent)) { - LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + - " mb (current store usage is " + storeCurrent / (1024 * 1024) + - " mb). The data directory: " + dir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); - } - } + checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); long maxJournalFileSize = 0; long storeLimit = usage.getStoreUsage().getLimit(); @@ -2015,28 +1996,9 @@ public class BrokerService implements Service { final SystemUsage usage = getSystemUsage(); File tmpDir = getTmpDataDirectory(); + if (tmpDir != null) { - - String tmpDirPath = tmpDir.getAbsolutePath(); - if (!tmpDir.isAbsolute()) { - tmpDir = new File(tmpDirPath); - } - - long storeLimit = usage.getTempUsage().getLimit(); - long storeCurrent = usage.getTempUsage().getUsage(); - while (tmpDir != null && !tmpDir.isDirectory()) { - tmpDir = tmpDir.getParentFile(); - } - long dirFreeSpace = tmpDir.getUsableSpace(); - if (storeLimit > (dirFreeSpace + storeCurrent)) { - LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + - " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) + - " mb). The temporary data directory: " + tmpDir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent); - } + checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); if (isPersistent()) { long maxJournalFileSize; @@ -2047,6 +2009,7 @@ public class BrokerService implements Service { } else { maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; } + long storeLimit = usage.getTempUsage().getLimit(); if (storeLimit < maxJournalFileSize) { LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + @@ -2058,6 +2021,60 @@ public class BrokerService implements Service { } } + protected void checkUsageLimit(File dir, Usage storeUsage, int percentLimit) { + if (dir != null) { + dir = StoreUtil.findParentDirectory(dir); + String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; + long storeLimit = storeUsage.getLimit(); + long storeCurrent = storeUsage.getUsage(); + long usableSpace = dir.getUsableSpace(); + long totalSpace = dir.getTotalSpace(); + long totalUsableSpace = dir.getUsableSpace() + storeCurrent; + //compute byte value of the percent limit + long bytePercentLimit = totalSpace * percentLimit / 100; + int oneMeg = 1024 * 1024; + + //Check if the store limit is less than the percent Limit that was set and also + //the usable space...this means we can grow the store larger + //Changes in partition size (total space) as well as changes in usable space should + //be detected here + if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 + && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ + + // set the limit to be bytePercentLimit or usableSpace if + // usableSpace is less than the percentLimit + long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; + + //To prevent changing too often, check threshold + if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { + LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " + + percentLimit + "% of the partition size."); + storeUsage.setLimit(newLimit); + LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace + + "% (" + newLimit / oneMeg + " mb) of the partition size."); + } + + //check if the limit is too large for the amount of usable space + } else if (storeLimit > totalUsableSpace) { + if (percentLimit > 0) { + LOG.warn(storeName + " limit has been set to " + + percentLimit + "% (" + storeLimit / oneMeg + " mb)" + + " of the partition size but there is not enough usable space." + + " Only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" + + " is available"); + } + + LOG.warn(storeName + " limit is " + storeLimit / oneMeg + + " mb (current store usage is " + storeCurrent / oneMeg + + " mb). The data directory: " + dir.getAbsolutePath() + + " only has " + usableSpace / oneMeg + + " mb of usable space - resetting to maximum available disk space: " + + totalUsableSpace / oneMeg + " mb"); + storeUsage.setLimit(totalUsableSpace); + } + } + } + /** * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to * update store and temporary store limits if the amount of available space @@ -2892,11 +2909,26 @@ public class BrokerService implements Service { this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; } + /** + * @param schedulePeriodForDiskUsageCheck + */ public void setSchedulePeriodForDiskUsageCheck( int schedulePeriodForDiskUsageCheck) { this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; } + public int getDiskUsageCheckRegrowThreshold() { + return diskUsageCheckRegrowThreshold; + } + + /** + * @param diskUsageCheckRegrowThreshold + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" + */ + public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { + this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; + } + public int getMaxPurgedDestinationsPerSweep() { return this.maxPurgedDestinationsPerSweep; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig new file mode 100644 index 0000000000..e27279c746 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig @@ -0,0 +1,3147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.security.Provider; +import java.security.Security; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionMetaData; +import org.apache.activemq.ConfigurationException; +import org.apache.activemq.Service; +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.broker.cluster.ConnectionSplitBroker; +import org.apache.activemq.broker.jmx.AnnotatedMBean; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.ConnectorView; +import org.apache.activemq.broker.jmx.ConnectorViewMBean; +import org.apache.activemq.broker.jmx.HealthView; +import org.apache.activemq.broker.jmx.HealthViewMBean; +import org.apache.activemq.broker.jmx.JmsConnectorView; +import org.apache.activemq.broker.jmx.JobSchedulerView; +import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; +import org.apache.activemq.broker.jmx.Log4JConfigView; +import org.apache.activemq.broker.jmx.ManagedRegionBroker; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.broker.jmx.NetworkConnectorView; +import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; +import org.apache.activemq.broker.jmx.ProxyConnectorView; +import org.apache.activemq.broker.region.CompositeDestinationInterceptor; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFactory; +import org.apache.activemq.broker.region.DestinationFactoryImpl; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.broker.scheduler.SchedulerBroker; +import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.network.ConnectionFilter; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.network.jms.JmsConnector; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.proxy.ProxyConnector; +import org.apache.activemq.security.MessageAuthorizationPolicy; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.store.JournaledStore; +import org.apache.activemq.store.PListStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.PersistenceAdapterFactory; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.TransportFactorySupport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.vm.VMTransportFactory; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.BrokerSupport; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOExceptionHandler; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.InetAddressUtil; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.StoreUtil; +import org.apache.activemq.util.ThreadPoolUtils; +import org.apache.activemq.util.TimeUtils; +import org.apache.activemq.util.URISupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a + * number of transport connectors, network connectors and a bunch of properties + * which can be used to configure the broker as its lazily created. + * + * @org.apache.xbean.XBean + */ +public class BrokerService implements Service { + public static final String DEFAULT_PORT = "61616"; + public static final String LOCAL_HOST_NAME; + public static final String BROKER_VERSION; + public static final String DEFAULT_BROKER_NAME = "localhost"; + public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; + public static final long DEFAULT_START_TIMEOUT = 600000L; + + private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); + + @SuppressWarnings("unused") + private static final long serialVersionUID = 7353129142305630237L; + + private boolean useJmx = true; + private boolean enableStatistics = true; + private boolean persistent = true; + private boolean populateJMSXUserID; + private boolean useAuthenticatedPrincipalForJMSXUserID; + private boolean populateUserNameInMBeans; + private long mbeanInvocationTimeout = 0; + + private boolean useShutdownHook = true; + private boolean useLoggingForShutdownErrors; + private boolean shutdownOnMasterFailure; + private boolean shutdownOnSlaveFailure; + private boolean waitForSlave; + private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; + private boolean passiveSlave; + private String brokerName = DEFAULT_BROKER_NAME; + private File dataDirectoryFile; + private File tmpDataDirectory; + private Broker broker; + private BrokerView adminView; + private ManagementContext managementContext; + private ObjectName brokerObjectName; + private TaskRunnerFactory taskRunnerFactory; + private TaskRunnerFactory persistenceTaskRunnerFactory; + private SystemUsage systemUsage; + private SystemUsage producerSystemUsage; + private SystemUsage consumerSystemUsaage; + private PersistenceAdapter persistenceAdapter; + private PersistenceAdapterFactory persistenceFactory; + protected DestinationFactory destinationFactory; + private MessageAuthorizationPolicy messageAuthorizationPolicy; + private final List transportConnectors = new CopyOnWriteArrayList(); + private final List networkConnectors = new CopyOnWriteArrayList(); + private final List proxyConnectors = new CopyOnWriteArrayList(); + private final List jmsConnectors = new CopyOnWriteArrayList(); + private final List services = new ArrayList(); + private transient Thread shutdownHook; + private String[] transportConnectorURIs; + private String[] networkConnectorURIs; + private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges + // to other jms messaging systems + private boolean deleteAllMessagesOnStartup; + private boolean advisorySupport = true; + private URI vmConnectorURI; + private String defaultSocketURIString; + private PolicyMap destinationPolicy; + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean stopping = new AtomicBoolean(false); + private BrokerPlugin[] plugins; + private boolean keepDurableSubsActive = true; + private boolean useVirtualTopics = true; + private boolean useMirroredQueues = false; + private boolean useTempMirroredQueues = true; + private BrokerId brokerId; + private volatile DestinationInterceptor[] destinationInterceptors; + private ActiveMQDestination[] destinations; + private PListStore tempDataStore; + private int persistenceThreadPriority = Thread.MAX_PRIORITY; + private boolean useLocalHostBrokerName; + private final CountDownLatch stoppedLatch = new CountDownLatch(1); + private final CountDownLatch startedLatch = new CountDownLatch(1); + private Broker regionBroker; + private int producerSystemUsagePortion = 60; + private int consumerSystemUsagePortion = 40; + private boolean splitSystemUsageForProducersConsumers; + private boolean monitorConnectionSplits = false; + private int taskRunnerPriority = Thread.NORM_PRIORITY; + private boolean dedicatedTaskRunner; + private boolean cacheTempDestinations = false;// useful for failover + private int timeBeforePurgeTempDestinations = 5000; + private final List shutdownHooks = new ArrayList(); + private boolean systemExitOnShutdown; + private int systemExitOnShutdownExitCode; + private SslContext sslContext; + private boolean forceStart = false; + private IOExceptionHandler ioExceptionHandler; + private boolean schedulerSupport = false; + private File schedulerDirectoryFile; + private Scheduler scheduler; + private ThreadPoolExecutor executor; + private int schedulePeriodForDestinationPurge= 0; + private int maxPurgedDestinationsPerSweep = 0; + private int schedulePeriodForDiskUsageCheck = 0; + private boolean diskUsageCheckRegrowPercentChange = true; + private BrokerContext brokerContext; + private boolean networkConnectorStartAsync = false; + private boolean allowTempAutoCreationOnSend; + private JobSchedulerStore jobSchedulerStore; + private final AtomicLong totalConnections = new AtomicLong(); + private final AtomicInteger currentConnections = new AtomicInteger(); + + private long offlineDurableSubscriberTimeout = -1; + private long offlineDurableSubscriberTaskSchedule = 300000; + private DestinationFilter virtualConsumerDestinationFilter; + + private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false); + private Throwable startException = null; + private boolean startAsync = false; + private Date startDate; + private boolean slave = true; + + private boolean restartAllowed = true; + private boolean restartRequested = false; + private boolean rejectDurableConsumers = false; + + private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; + + static { + + try { + ClassLoader loader = BrokerService.class.getClassLoader(); + Class clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); + Provider bouncycastle = (Provider) clazz.newInstance(); + Security.insertProviderAt(bouncycastle, 2); + LOG.info("Loaded the Bouncy Castle security provider."); + } catch(Throwable e) { + // No BouncyCastle found so we use the default Java Security Provider + } + + String localHostName = "localhost"; + try { + localHostName = InetAddressUtil.getLocalHostName(); + } catch (UnknownHostException e) { + LOG.error("Failed to resolve localhost"); + } + LOCAL_HOST_NAME = localHostName; + + String version = null; + try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { + if (in != null) { + try(InputStreamReader isr = new InputStreamReader(in); + BufferedReader reader = new BufferedReader(isr)) { + version = reader.readLine(); + } + } + } catch (IOException ie) { + LOG.warn("Error reading broker version ", ie); + } + BROKER_VERSION = version; + } + + @Override + public String toString() { + return "BrokerService[" + getBrokerName() + "]"; + } + + private String getBrokerVersion() { + String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; + if (version == null) { + version = BROKER_VERSION; + } + + return version; + } + + /** + * Adds a new transport connector for the given bind address + * + * @return the newly created and added transport connector + * @throws Exception + */ + public TransportConnector addConnector(String bindAddress) throws Exception { + return addConnector(new URI(bindAddress)); + } + + /** + * Adds a new transport connector for the given bind address + * + * @return the newly created and added transport connector + * @throws Exception + */ + public TransportConnector addConnector(URI bindAddress) throws Exception { + return addConnector(createTransportConnector(bindAddress)); + } + + /** + * Adds a new transport connector for the given TransportServer transport + * + * @return the newly created and added transport connector + * @throws Exception + */ + public TransportConnector addConnector(TransportServer transport) throws Exception { + return addConnector(new TransportConnector(transport)); + } + + /** + * Adds a new transport connector + * + * @return the transport connector + * @throws Exception + */ + public TransportConnector addConnector(TransportConnector connector) throws Exception { + transportConnectors.add(connector); + return connector; + } + + /** + * Stops and removes a transport connector from the broker. + * + * @param connector + * @return true if the connector has been previously added to the broker + * @throws Exception + */ + public boolean removeConnector(TransportConnector connector) throws Exception { + boolean rc = transportConnectors.remove(connector); + if (rc) { + unregisterConnectorMBean(connector); + } + return rc; + } + + /** + * Adds a new network connector using the given discovery address + * + * @return the newly created and added network connector + * @throws Exception + */ + public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { + return addNetworkConnector(new URI(discoveryAddress)); + } + + /** + * Adds a new proxy connector using the given bind address + * + * @return the newly created and added network connector + * @throws Exception + */ + public ProxyConnector addProxyConnector(String bindAddress) throws Exception { + return addProxyConnector(new URI(bindAddress)); + } + + /** + * Adds a new network connector using the given discovery address + * + * @return the newly created and added network connector + * @throws Exception + */ + public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { + NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); + return addNetworkConnector(connector); + } + + /** + * Adds a new proxy connector using the given bind address + * + * @return the newly created and added network connector + * @throws Exception + */ + public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { + ProxyConnector connector = new ProxyConnector(); + connector.setBind(bindAddress); + connector.setRemote(new URI("fanout:multicast://default")); + return addProxyConnector(connector); + } + + /** + * Adds a new network connector to connect this broker to a federated + * network + */ + public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { + connector.setBrokerService(this); + connector.setLocalUri(getVmConnectorURI()); + // Set a connection filter so that the connector does not establish loop + // back connections. + connector.setConnectionFilter(new ConnectionFilter() { + @Override + public boolean connectTo(URI location) { + List transportConnectors = getTransportConnectors(); + for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { + try { + TransportConnector tc = iter.next(); + if (location.equals(tc.getConnectUri())) { + return false; + } + } catch (Throwable e) { + } + } + return true; + } + }); + networkConnectors.add(connector); + return connector; + } + + /** + * Removes the given network connector without stopping it. The caller + * should call {@link NetworkConnector#stop()} to close the connector + */ + public boolean removeNetworkConnector(NetworkConnector connector) { + boolean answer = networkConnectors.remove(connector); + if (answer) { + unregisterNetworkConnectorMBean(connector); + } + return answer; + } + + public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { + URI uri = getVmConnectorURI(); + connector.setLocalUri(uri); + proxyConnectors.add(connector); + if (isUseJmx()) { + registerProxyConnectorMBean(connector); + } + return connector; + } + + public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { + connector.setBrokerService(this); + jmsConnectors.add(connector); + if (isUseJmx()) { + registerJmsConnectorMBean(connector); + } + return connector; + } + + public JmsConnector removeJmsConnector(JmsConnector connector) { + if (jmsConnectors.remove(connector)) { + return connector; + } + return null; + } + + public void masterFailed() { + if (shutdownOnMasterFailure) { + LOG.error("The Master has failed ... shutting down"); + try { + stop(); + } catch (Exception e) { + LOG.error("Failed to stop for master failure", e); + } + } else { + LOG.warn("Master Failed - starting all connectors"); + try { + startAllConnectors(); + broker.nowMasterBroker(); + } catch (Exception e) { + LOG.error("Failed to startAllConnectors", e); + } + } + } + + public String getUptime() { + long delta = getUptimeMillis(); + + if (delta == 0) { + return "not started"; + } + + return TimeUtils.printDuration(delta); + } + + public long getUptimeMillis() { + if (startDate == null) { + return 0; + } + + return new Date().getTime() - startDate.getTime(); + } + + public boolean isStarted() { + return started.get() && startedLatch.getCount() == 0; + } + + /** + * Forces a start of the broker. + * By default a BrokerService instance that was + * previously stopped using BrokerService.stop() cannot be restarted + * using BrokerService.start(). + * This method enforces a restart. + * It is not recommended to force a restart of the broker and will not work + * for most but some very trivial broker configurations. + * For restarting a broker instance we recommend to first call stop() on + * the old instance and then recreate a new BrokerService instance. + * + * @param force - if true enforces a restart. + * @throws Exception + */ + public void start(boolean force) throws Exception { + forceStart = force; + stopped.set(false); + started.set(false); + start(); + } + + // Service interface + // ------------------------------------------------------------------------- + + protected boolean shouldAutostart() { + return true; + } + + /** + * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions + * + * delegates to autoStart, done to prevent backwards incompatible signature change + */ + @PostConstruct + private void postConstruct() { + try { + autoStart(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * + * @throws Exception + * @org. apache.xbean.InitMethod + */ + public void autoStart() throws Exception { + if(shouldAutostart()) { + start(); + } + } + + @Override + public void start() throws Exception { + if (stopped.get() || !started.compareAndSet(false, true)) { + // lets just ignore redundant start() calls + // as its way too easy to not be completely sure if start() has been + // called or not with the gazillion of different configuration + // mechanisms + // throw new IllegalStateException("Already started."); + return; + } + + stopping.set(false); + startDate = new Date(); + MDC.put("activemq.broker", brokerName); + + try { + if (systemExitOnShutdown && useShutdownHook) { + throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); + } + processHelperProperties(); + if (isUseJmx()) { + // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and + // we cannot cleanup clear that during shutdown of the broker. + MDC.remove("activemq.broker"); + try { + startManagementContext(); + for (NetworkConnector connector : getNetworkConnectors()) { + registerNetworkConnectorMBean(connector); + } + } finally { + MDC.put("activemq.broker", brokerName); + } + } + + // in jvm master slave, lets not publish over existing broker till we get the lock + final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); + if (brokerRegistry.lookup(getBrokerName()) == null) { + brokerRegistry.bind(getBrokerName(), BrokerService.this); + } + startPersistenceAdapter(startAsync); + startBroker(startAsync); + brokerRegistry.bind(getBrokerName(), BrokerService.this); + } catch (Exception e) { + LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); + try { + if (!stopped.get()) { + stop(); + } + } catch (Exception ex) { + LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); + } + throw e; + } finally { + MDC.remove("activemq.broker"); + } + } + + private void startPersistenceAdapter(boolean async) throws Exception { + if (async) { + new Thread("Persistence Adapter Starting Thread") { + @Override + public void run() { + try { + doStartPersistenceAdapter(); + } catch (Throwable e) { + startException = e; + } finally { + synchronized (persistenceAdapterStarted) { + persistenceAdapterStarted.set(true); + persistenceAdapterStarted.notifyAll(); + } + } + } + }.start(); + } else { + doStartPersistenceAdapter(); + } + } + + private void doStartPersistenceAdapter() throws Exception { + getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); + getPersistenceAdapter().setBrokerName(getBrokerName()); + LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter()); + if (deleteAllMessagesOnStartup) { + deleteAllMessages(); + } + getPersistenceAdapter().start(); + + getJobSchedulerStore(); + if (jobSchedulerStore != null) { + try { + jobSchedulerStore.start(); + } catch (Exception e) { + RuntimeException exception = new RuntimeException( + "Failed to start job scheduler store: " + jobSchedulerStore, e); + LOG.error(exception.getLocalizedMessage(), e); + throw exception; + } + } + } + + private void startBroker(boolean async) throws Exception { + if (async) { + new Thread("Broker Starting Thread") { + @Override + public void run() { + try { + synchronized (persistenceAdapterStarted) { + if (!persistenceAdapterStarted.get()) { + persistenceAdapterStarted.wait(); + } + } + doStartBroker(); + } catch (Throwable t) { + startException = t; + } + } + }.start(); + } else { + doStartBroker(); + } + } + + private void doStartBroker() throws Exception { + if (startException != null) { + return; + } + startDestinations(); + addShutdownHook(); + + broker = getBroker(); + brokerId = broker.getBrokerId(); + + // need to log this after creating the broker so we have its id and name + LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); + broker.start(); + + if (isUseJmx()) { + if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { + // try to restart management context + // typical for slaves that use the same ports as master + managementContext.stop(); + startManagementContext(); + } + ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; + managedBroker.setContextBroker(broker); + adminView.setBroker(managedBroker); + } + + if (ioExceptionHandler == null) { + setIoExceptionHandler(new DefaultIOExceptionHandler()); + } + + if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { + ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); + Log4JConfigView log4jConfigView = new Log4JConfigView(); + AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); + } + + startAllConnectors(); + + LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); + LOG.info("For help or more information please see: http://activemq.apache.org"); + + getBroker().brokerServiceStarted(); + checkSystemUsageLimits(); + startedLatch.countDown(); + getBroker().nowMasterBroker(); + } + + /** + * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions + * + * delegates to stop, done to prevent backwards incompatible signature change + */ + @PreDestroy + private void preDestroy () { + try { + stop(); + } catch (Exception ex) { + throw new RuntimeException(); + } + } + + /** + * + * @throws Exception + * @org.apache .xbean.DestroyMethod + */ + @Override + public void stop() throws Exception { + if (!stopping.compareAndSet(false, true)) { + LOG.trace("Broker already stopping/stopped"); + return; + } + + MDC.put("activemq.broker", brokerName); + + if (systemExitOnShutdown) { + new Thread() { + @Override + public void run() { + System.exit(systemExitOnShutdownExitCode); + } + }.start(); + } + + LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); + + removeShutdownHook(); + if (this.scheduler != null) { + this.scheduler.stop(); + this.scheduler = null; + } + ServiceStopper stopper = new ServiceStopper(); + if (services != null) { + for (Service service : services) { + stopper.stop(service); + } + } + stopAllConnectors(stopper); + this.slave = true; + // remove any VMTransports connected + // this has to be done after services are stopped, + // to avoid timing issue with discovery (spinning up a new instance) + BrokerRegistry.getInstance().unbind(getBrokerName()); + VMTransportFactory.stopped(getBrokerName()); + if (broker != null) { + stopper.stop(broker); + broker = null; + } + + if (jobSchedulerStore != null) { + jobSchedulerStore.stop(); + jobSchedulerStore = null; + } + if (tempDataStore != null) { + tempDataStore.stop(); + tempDataStore = null; + } + try { + stopper.stop(persistenceAdapter); + persistenceAdapter = null; + if (isUseJmx()) { + stopper.stop(getManagementContext()); + managementContext = null; + } + // Clear SelectorParser cache to free memory + SelectorParser.clearCache(); + } finally { + started.set(false); + stopped.set(true); + stoppedLatch.countDown(); + } + + if (this.taskRunnerFactory != null) { + this.taskRunnerFactory.shutdown(); + this.taskRunnerFactory = null; + } + if (this.executor != null) { + ThreadPoolUtils.shutdownNow(executor); + this.executor = null; + } + + this.destinationInterceptors = null; + this.destinationFactory = null; + + if (startDate != null) { + LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); + } + LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); + + synchronized (shutdownHooks) { + for (Runnable hook : shutdownHooks) { + try { + hook.run(); + } catch (Throwable e) { + stopper.onException(hook, e); + } + } + } + + MDC.remove("activemq.broker"); + + // and clear start date + startDate = null; + + stopper.throwFirstException(); + } + + public boolean checkQueueSize(String queueName) { + long count = 0; + long queueSize = 0; + Map destinationMap = regionBroker.getDestinationMap(); + for (Map.Entry entry : destinationMap.entrySet()) { + if (entry.getKey().isQueue()) { + if (entry.getValue().getName().matches(queueName)) { + queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); + count += queueSize; + if (queueSize > 0) { + LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); + } + } + } + } + return count == 0; + } + + /** + * This method (both connectorName and queueName are using regex to match) + * 1. stop the connector (supposed the user input the connector which the + * clients connect to) 2. to check whether there is any pending message on + * the queues defined by queueName 3. supposedly, after stop the connector, + * client should failover to other broker and pending messages should be + * forwarded. if no pending messages, the method finally call stop to stop + * the broker. + * + * @param connectorName + * @param queueName + * @param timeout + * @param pollInterval + * @throws Exception + */ + public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { + if (isUseJmx()) { + if (connectorName == null || queueName == null || timeout <= 0) { + throw new Exception( + "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); + } + if (pollInterval <= 0) { + pollInterval = 30; + } + LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ + connectorName, queueName, timeout, pollInterval + }); + TransportConnector connector; + for (int i = 0; i < transportConnectors.size(); i++) { + connector = transportConnectors.get(i); + if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { + connector.stop(); + } + } + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeout * 1000) { + // check quesize until it gets zero + if (checkQueueSize(queueName)) { + stop(); + break; + } else { + Thread.sleep(pollInterval * 1000); + } + } + if (stopped.get()) { + LOG.info("Successfully stop the broker."); + } else { + LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); + } + } + } + + /** + * A helper method to block the caller thread until the broker has been + * stopped + */ + public void waitUntilStopped() { + while (isStarted() && !stopped.get()) { + try { + stoppedLatch.await(); + } catch (InterruptedException e) { + // ignore + } + } + } + + public boolean isStopped() { + return stopped.get(); + } + + /** + * A helper method to block the caller thread until the broker has fully started + * @return boolean true if wait succeeded false if broker was not started or was stopped + */ + public boolean waitUntilStarted() { + return waitUntilStarted(DEFAULT_START_TIMEOUT); + } + + /** + * A helper method to block the caller thread until the broker has fully started + * + * @param timeout + * the amount of time to wait before giving up and returning false. + * + * @return boolean true if wait succeeded false if broker was not started or was stopped + */ + public boolean waitUntilStarted(long timeout) { + boolean waitSucceeded = isStarted(); + long expiration = Math.max(0, timeout + System.currentTimeMillis()); + while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { + try { + if (startException != null) { + return waitSucceeded; + } + waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignore) { + } + } + return waitSucceeded; + } + + // Properties + // ------------------------------------------------------------------------- + /** + * Returns the message broker + */ + public Broker getBroker() throws Exception { + if (broker == null) { + broker = createBroker(); + } + return broker; + } + + /** + * Returns the administration view of the broker; used to create and destroy + * resources such as queues and topics. Note this method returns null if JMX + * is disabled. + */ + public BrokerView getAdminView() throws Exception { + if (adminView == null) { + // force lazy creation + getBroker(); + } + return adminView; + } + + public void setAdminView(BrokerView adminView) { + this.adminView = adminView; + } + + public String getBrokerName() { + return brokerName; + } + + /** + * Sets the name of this broker; which must be unique in the network + * + * @param brokerName + */ + public void setBrokerName(String brokerName) { + if (brokerName == null) { + throw new NullPointerException("The broker name cannot be null"); + } + String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); + if (!str.equals(brokerName)) { + LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str); + } + this.brokerName = str.trim(); + } + + public PersistenceAdapterFactory getPersistenceFactory() { + return persistenceFactory; + } + + public File getDataDirectoryFile() { + if (dataDirectoryFile == null) { + dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); + } + return dataDirectoryFile; + } + + public File getBrokerDataDirectory() { + String brokerDir = getBrokerName(); + return new File(getDataDirectoryFile(), brokerDir); + } + + /** + * Sets the directory in which the data files will be stored by default for + * the JDBC and Journal persistence adaptors. + * + * @param dataDirectory + * the directory to store data files + */ + public void setDataDirectory(String dataDirectory) { + setDataDirectoryFile(new File(dataDirectory)); + } + + /** + * Sets the directory in which the data files will be stored by default for + * the JDBC and Journal persistence adaptors. + * + * @param dataDirectoryFile + * the directory to store data files + */ + public void setDataDirectoryFile(File dataDirectoryFile) { + this.dataDirectoryFile = dataDirectoryFile; + } + + /** + * @return the tmpDataDirectory + */ + public File getTmpDataDirectory() { + if (tmpDataDirectory == null) { + tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); + } + return tmpDataDirectory; + } + + /** + * @param tmpDataDirectory + * the tmpDataDirectory to set + */ + public void setTmpDataDirectory(File tmpDataDirectory) { + this.tmpDataDirectory = tmpDataDirectory; + } + + public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { + this.persistenceFactory = persistenceFactory; + } + + public void setDestinationFactory(DestinationFactory destinationFactory) { + this.destinationFactory = destinationFactory; + } + + public boolean isPersistent() { + return persistent; + } + + /** + * Sets whether or not persistence is enabled or disabled. + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setPersistent(boolean persistent) { + this.persistent = persistent; + } + + public boolean isPopulateJMSXUserID() { + return populateJMSXUserID; + } + + /** + * Sets whether or not the broker should populate the JMSXUserID header. + */ + public void setPopulateJMSXUserID(boolean populateJMSXUserID) { + this.populateJMSXUserID = populateJMSXUserID; + } + + public SystemUsage getSystemUsage() { + try { + if (systemUsage == null) { + + systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); + systemUsage.setExecutor(getExecutor()); + systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB + systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB + systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB + systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB + addService(this.systemUsage); + } + return systemUsage; + } catch (IOException e) { + LOG.error("Cannot create SystemUsage", e); + throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); + } + } + + public void setSystemUsage(SystemUsage memoryManager) { + if (this.systemUsage != null) { + removeService(this.systemUsage); + } + this.systemUsage = memoryManager; + if (this.systemUsage.getExecutor()==null) { + this.systemUsage.setExecutor(getExecutor()); + } + addService(this.systemUsage); + } + + /** + * @return the consumerUsageManager + * @throws IOException + */ + public SystemUsage getConsumerSystemUsage() throws IOException { + if (this.consumerSystemUsaage == null) { + if (splitSystemUsageForProducersConsumers) { + this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); + float portion = consumerSystemUsagePortion / 100f; + this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); + addService(this.consumerSystemUsaage); + } else { + consumerSystemUsaage = getSystemUsage(); + } + } + return this.consumerSystemUsaage; + } + + /** + * @param consumerSystemUsaage + * the storeSystemUsage to set + */ + public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { + if (this.consumerSystemUsaage != null) { + removeService(this.consumerSystemUsaage); + } + this.consumerSystemUsaage = consumerSystemUsaage; + addService(this.consumerSystemUsaage); + } + + /** + * @return the producerUsageManager + * @throws IOException + */ + public SystemUsage getProducerSystemUsage() throws IOException { + if (producerSystemUsage == null) { + if (splitSystemUsageForProducersConsumers) { + producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); + float portion = producerSystemUsagePortion / 100f; + producerSystemUsage.getMemoryUsage().setUsagePortion(portion); + addService(producerSystemUsage); + } else { + producerSystemUsage = getSystemUsage(); + } + } + return producerSystemUsage; + } + + /** + * @param producerUsageManager + * the producerUsageManager to set + */ + public void setProducerSystemUsage(SystemUsage producerUsageManager) { + if (this.producerSystemUsage != null) { + removeService(this.producerSystemUsage); + } + this.producerSystemUsage = producerUsageManager; + addService(this.producerSystemUsage); + } + + public PersistenceAdapter getPersistenceAdapter() throws IOException { + if (persistenceAdapter == null) { + persistenceAdapter = createPersistenceAdapter(); + configureService(persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); + } + return persistenceAdapter; + } + + /** + * Sets the persistence adaptor implementation to use for this broker + * + * @throws IOException + */ + public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { + if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { + LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); + return; + } + this.persistenceAdapter = persistenceAdapter; + configureService(this.persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); + } + + public TaskRunnerFactory getTaskRunnerFactory() { + if (this.taskRunnerFactory == null) { + this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, + isDedicatedTaskRunner()); + this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); + } + return this.taskRunnerFactory; + } + + public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { + this.taskRunnerFactory = taskRunnerFactory; + } + + public TaskRunnerFactory getPersistenceTaskRunnerFactory() { + if (taskRunnerFactory == null) { + persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, + true, 1000, isDedicatedTaskRunner()); + } + return persistenceTaskRunnerFactory; + } + + public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { + this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; + } + + public boolean isUseJmx() { + return useJmx; + } + + public boolean isEnableStatistics() { + return enableStatistics; + } + + /** + * Sets whether or not the Broker's services enable statistics or not. + */ + public void setEnableStatistics(boolean enableStatistics) { + this.enableStatistics = enableStatistics; + } + + /** + * Sets whether or not the Broker's services should be exposed into JMX or + * not. + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setUseJmx(boolean useJmx) { + this.useJmx = useJmx; + } + + public ObjectName getBrokerObjectName() throws MalformedObjectNameException { + if (brokerObjectName == null) { + brokerObjectName = createBrokerObjectName(); + } + return brokerObjectName; + } + + /** + * Sets the JMX ObjectName for this broker + */ + public void setBrokerObjectName(ObjectName brokerObjectName) { + this.brokerObjectName = brokerObjectName; + } + + public ManagementContext getManagementContext() { + if (managementContext == null) { + managementContext = new ManagementContext(); + } + return managementContext; + } + + public void setManagementContext(ManagementContext managementContext) { + this.managementContext = managementContext; + } + + public NetworkConnector getNetworkConnectorByName(String connectorName) { + for (NetworkConnector connector : networkConnectors) { + if (connector.getName().equals(connectorName)) { + return connector; + } + } + return null; + } + + public String[] getNetworkConnectorURIs() { + return networkConnectorURIs; + } + + public void setNetworkConnectorURIs(String[] networkConnectorURIs) { + this.networkConnectorURIs = networkConnectorURIs; + } + + public TransportConnector getConnectorByName(String connectorName) { + for (TransportConnector connector : transportConnectors) { + if (connector.getName().equals(connectorName)) { + return connector; + } + } + return null; + } + + public Map getTransportConnectorURIsAsMap() { + Map answer = new HashMap(); + for (TransportConnector connector : transportConnectors) { + try { + URI uri = connector.getConnectUri(); + if (uri != null) { + String scheme = uri.getScheme(); + if (scheme != null) { + answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); + } + } + } catch (Exception e) { + LOG.debug("Failed to read URI to build transportURIsAsMap", e); + } + } + return answer; + } + + public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ + ProducerBrokerExchange result = null; + + for (TransportConnector connector : transportConnectors) { + for (TransportConnection tc: connector.getConnections()){ + result = tc.getProducerBrokerExchangeIfExists(producerInfo); + if (result !=null){ + return result; + } + } + } + return result; + } + + public String[] getTransportConnectorURIs() { + return transportConnectorURIs; + } + + public void setTransportConnectorURIs(String[] transportConnectorURIs) { + this.transportConnectorURIs = transportConnectorURIs; + } + + /** + * @return Returns the jmsBridgeConnectors. + */ + public JmsConnector[] getJmsBridgeConnectors() { + return jmsBridgeConnectors; + } + + /** + * @param jmsConnectors + * The jmsBridgeConnectors to set. + */ + public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { + this.jmsBridgeConnectors = jmsConnectors; + } + + public Service[] getServices() { + return services.toArray(new Service[0]); + } + + /** + * Sets the services associated with this broker. + */ + public void setServices(Service[] services) { + this.services.clear(); + if (services != null) { + for (int i = 0; i < services.length; i++) { + this.services.add(services[i]); + } + } + } + + /** + * Adds a new service so that it will be started as part of the broker + * lifecycle + */ + public void addService(Service service) { + services.add(service); + } + + public void removeService(Service service) { + services.remove(service); + } + + public boolean isUseLoggingForShutdownErrors() { + return useLoggingForShutdownErrors; + } + + /** + * Sets whether or not we should use commons-logging when reporting errors + * when shutting down the broker + */ + public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { + this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; + } + + public boolean isUseShutdownHook() { + return useShutdownHook; + } + + /** + * Sets whether or not we should use a shutdown handler to close down the + * broker cleanly if the JVM is terminated. It is recommended you leave this + * enabled. + */ + public void setUseShutdownHook(boolean useShutdownHook) { + this.useShutdownHook = useShutdownHook; + } + + public boolean isAdvisorySupport() { + return advisorySupport; + } + + /** + * Allows the support of advisory messages to be disabled for performance + * reasons. + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setAdvisorySupport(boolean advisorySupport) { + this.advisorySupport = advisorySupport; + } + + public List getTransportConnectors() { + return new ArrayList(transportConnectors); + } + + /** + * Sets the transport connectors which this broker will listen on for new + * clients + * + * @org.apache.xbean.Property + * nestedType="org.apache.activemq.broker.TransportConnector" + */ + public void setTransportConnectors(List transportConnectors) throws Exception { + for (TransportConnector connector : transportConnectors) { + addConnector(connector); + } + } + + public TransportConnector getTransportConnectorByName(String name){ + for (TransportConnector transportConnector : transportConnectors){ + if (name.equals(transportConnector.getName())){ + return transportConnector; + } + } + return null; + } + + public TransportConnector getTransportConnectorByScheme(String scheme){ + for (TransportConnector transportConnector : transportConnectors){ + if (scheme.equals(transportConnector.getUri().getScheme())){ + return transportConnector; + } + } + return null; + } + + public List getNetworkConnectors() { + return new ArrayList(networkConnectors); + } + + public List getProxyConnectors() { + return new ArrayList(proxyConnectors); + } + + /** + * Sets the network connectors which this broker will use to connect to + * other brokers in a federated network + * + * @org.apache.xbean.Property + * nestedType="org.apache.activemq.network.NetworkConnector" + */ + public void setNetworkConnectors(List networkConnectors) throws Exception { + for (Object connector : networkConnectors) { + addNetworkConnector((NetworkConnector) connector); + } + } + + /** + * Sets the network connectors which this broker will use to connect to + * other brokers in a federated network + */ + public void setProxyConnectors(List proxyConnectors) throws Exception { + for (Object connector : proxyConnectors) { + addProxyConnector((ProxyConnector) connector); + } + } + + public PolicyMap getDestinationPolicy() { + return destinationPolicy; + } + + /** + * Sets the destination specific policies available either for exact + * destinations or for wildcard areas of destinations. + */ + public void setDestinationPolicy(PolicyMap policyMap) { + this.destinationPolicy = policyMap; + } + + public BrokerPlugin[] getPlugins() { + return plugins; + } + + /** + * Sets a number of broker plugins to install such as for security + * authentication or authorization + */ + public void setPlugins(BrokerPlugin[] plugins) { + this.plugins = plugins; + } + + public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { + return messageAuthorizationPolicy; + } + + /** + * Sets the policy used to decide if the current connection is authorized to + * consume a given message + */ + public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } + + /** + * Delete all messages from the persistent store + * + * @throws IOException + */ + public void deleteAllMessages() throws IOException { + getPersistenceAdapter().deleteAllMessages(); + } + + public boolean isDeleteAllMessagesOnStartup() { + return deleteAllMessagesOnStartup; + } + + /** + * Sets whether or not all messages are deleted on startup - mostly only + * useful for testing. + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { + this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; + } + + public URI getVmConnectorURI() { + if (vmConnectorURI == null) { + try { + vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); + } catch (URISyntaxException e) { + LOG.error("Badly formed URI from {}", getBrokerName(), e); + } + } + return vmConnectorURI; + } + + public void setVmConnectorURI(URI vmConnectorURI) { + this.vmConnectorURI = vmConnectorURI; + } + + public String getDefaultSocketURIString() { + if (started.get()) { + if (this.defaultSocketURIString == null) { + for (TransportConnector tc:this.transportConnectors) { + String result = null; + try { + result = tc.getPublishableConnectString(); + } catch (Exception e) { + LOG.warn("Failed to get the ConnectURI for {}", tc, e); + } + if (result != null) { + // find first publishable uri + if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { + this.defaultSocketURIString = result; + break; + } else { + // or use the first defined + if (this.defaultSocketURIString == null) { + this.defaultSocketURIString = result; + } + } + } + } + + } + return this.defaultSocketURIString; + } + return null; + } + + /** + * @return Returns the shutdownOnMasterFailure. + */ + public boolean isShutdownOnMasterFailure() { + return shutdownOnMasterFailure; + } + + /** + * @param shutdownOnMasterFailure + * The shutdownOnMasterFailure to set. + */ + public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { + this.shutdownOnMasterFailure = shutdownOnMasterFailure; + } + + public boolean isKeepDurableSubsActive() { + return keepDurableSubsActive; + } + + public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { + this.keepDurableSubsActive = keepDurableSubsActive; + } + + public boolean isUseVirtualTopics() { + return useVirtualTopics; + } + + /** + * Sets whether or not Virtual + * Topics should be supported by default if they have not been + * explicitly configured. + */ + public void setUseVirtualTopics(boolean useVirtualTopics) { + this.useVirtualTopics = useVirtualTopics; + } + + public DestinationInterceptor[] getDestinationInterceptors() { + return destinationInterceptors; + } + + public boolean isUseMirroredQueues() { + return useMirroredQueues; + } + + /** + * Sets whether or not Mirrored + * Queues should be supported by default if they have not been + * explicitly configured. + */ + public void setUseMirroredQueues(boolean useMirroredQueues) { + this.useMirroredQueues = useMirroredQueues; + } + + /** + * Sets the destination interceptors to use + */ + public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { + this.destinationInterceptors = destinationInterceptors; + } + + public ActiveMQDestination[] getDestinations() { + return destinations; + } + + /** + * Sets the destinations which should be loaded/created on startup + */ + public void setDestinations(ActiveMQDestination[] destinations) { + this.destinations = destinations; + } + + /** + * @return the tempDataStore + */ + public synchronized PListStore getTempDataStore() { + if (tempDataStore == null) { + if (!isPersistent()) { + return null; + } + + try { + PersistenceAdapter pa = getPersistenceAdapter(); + if( pa!=null && pa instanceof PListStore) { + return (PListStore) pa; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + boolean result = true; + boolean empty = true; + try { + File directory = getTmpDataDirectory(); + if (directory.exists() && directory.isDirectory()) { + File[] files = directory.listFiles(); + if (files != null && files.length > 0) { + empty = false; + for (int i = 0; i < files.length; i++) { + File file = files[i]; + if (!file.isDirectory()) { + result &= file.delete(); + } + } + } + } + if (!empty) { + String str = result ? "Successfully deleted" : "Failed to delete"; + LOG.info("{} temporary storage", str); + } + + String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; + this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); + this.tempDataStore.setDirectory(getTmpDataDirectory()); + configureService(tempDataStore); + this.tempDataStore.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return tempDataStore; + } + + /** + * @param tempDataStore + * the tempDataStore to set + */ + public void setTempDataStore(PListStore tempDataStore) { + this.tempDataStore = tempDataStore; + configureService(tempDataStore); + try { + tempDataStore.start(); + } catch (Exception e) { + RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e); + LOG.error(exception.getLocalizedMessage(), e); + throw exception; + } + } + + public int getPersistenceThreadPriority() { + return persistenceThreadPriority; + } + + public void setPersistenceThreadPriority(int persistenceThreadPriority) { + this.persistenceThreadPriority = persistenceThreadPriority; + } + + /** + * @return the useLocalHostBrokerName + */ + public boolean isUseLocalHostBrokerName() { + return this.useLocalHostBrokerName; + } + + /** + * @param useLocalHostBrokerName + * the useLocalHostBrokerName to set + */ + public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { + this.useLocalHostBrokerName = useLocalHostBrokerName; + if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { + brokerName = LOCAL_HOST_NAME; + } + } + + /** + * Looks up and lazily creates if necessary the destination for the given + * JMS name + */ + public Destination getDestination(ActiveMQDestination destination) throws Exception { + return getBroker().addDestination(getAdminConnectionContext(), destination,false); + } + + public void removeDestination(ActiveMQDestination destination) throws Exception { + getBroker().removeDestination(getAdminConnectionContext(), destination, 0); + } + + public int getProducerSystemUsagePortion() { + return producerSystemUsagePortion; + } + + public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { + this.producerSystemUsagePortion = producerSystemUsagePortion; + } + + public int getConsumerSystemUsagePortion() { + return consumerSystemUsagePortion; + } + + public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { + this.consumerSystemUsagePortion = consumerSystemUsagePortion; + } + + public boolean isSplitSystemUsageForProducersConsumers() { + return splitSystemUsageForProducersConsumers; + } + + public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { + this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; + } + + public boolean isMonitorConnectionSplits() { + return monitorConnectionSplits; + } + + public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { + this.monitorConnectionSplits = monitorConnectionSplits; + } + + public int getTaskRunnerPriority() { + return taskRunnerPriority; + } + + public void setTaskRunnerPriority(int taskRunnerPriority) { + this.taskRunnerPriority = taskRunnerPriority; + } + + public boolean isDedicatedTaskRunner() { + return dedicatedTaskRunner; + } + + public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { + this.dedicatedTaskRunner = dedicatedTaskRunner; + } + + public boolean isCacheTempDestinations() { + return cacheTempDestinations; + } + + public void setCacheTempDestinations(boolean cacheTempDestinations) { + this.cacheTempDestinations = cacheTempDestinations; + } + + public int getTimeBeforePurgeTempDestinations() { + return timeBeforePurgeTempDestinations; + } + + public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { + this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; + } + + public boolean isUseTempMirroredQueues() { + return useTempMirroredQueues; + } + + public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { + this.useTempMirroredQueues = useTempMirroredQueues; + } + + public synchronized JobSchedulerStore getJobSchedulerStore() { + + // If support is off don't allow any scheduler even is user configured their own. + if (!isSchedulerSupport()) { + return null; + } + + // If the user configured their own we use it even if persistence is disabled since + // we don't know anything about their implementation. + if (jobSchedulerStore == null) { + + if (!isPersistent()) { + this.jobSchedulerStore = new InMemoryJobSchedulerStore(); + configureService(jobSchedulerStore); + return this.jobSchedulerStore; + } + + try { + PersistenceAdapter pa = getPersistenceAdapter(); + if (pa != null) { + this.jobSchedulerStore = pa.createJobSchedulerStore(); + jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); + configureService(jobSchedulerStore); + return this.jobSchedulerStore; + } + } catch (IOException e) { + throw new RuntimeException(e); + } catch (UnsupportedOperationException ex) { + // It's ok if the store doesn't implement a scheduler. + } catch (Exception e) { + throw new RuntimeException(e); + } + + try { + PersistenceAdapter pa = getPersistenceAdapter(); + if (pa != null && pa instanceof JobSchedulerStore) { + this.jobSchedulerStore = (JobSchedulerStore) pa; + configureService(jobSchedulerStore); + return this.jobSchedulerStore; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Load the KahaDB store as a last resort, this only works if KahaDB is + // included at runtime, otherwise this will fail. User should disable + // scheduler support if this fails. + try { + String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; + PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); + jobSchedulerStore = adaptor.createJobSchedulerStore(); + jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); + configureService(jobSchedulerStore); + LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return jobSchedulerStore; + } + + public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { + this.jobSchedulerStore = jobSchedulerStore; + configureService(jobSchedulerStore); + } + + // + // Implementation methods + // ------------------------------------------------------------------------- + /** + * Handles any lazy-creation helper properties which are added to make + * things easier to configure inside environments such as Spring + * + * @throws Exception + */ + protected void processHelperProperties() throws Exception { + if (transportConnectorURIs != null) { + for (int i = 0; i < transportConnectorURIs.length; i++) { + String uri = transportConnectorURIs[i]; + addConnector(uri); + } + } + if (networkConnectorURIs != null) { + for (int i = 0; i < networkConnectorURIs.length; i++) { + String uri = networkConnectorURIs[i]; + addNetworkConnector(uri); + } + } + if (jmsBridgeConnectors != null) { + for (int i = 0; i < jmsBridgeConnectors.length; i++) { + addJmsConnector(jmsBridgeConnectors[i]); + } + } + } + + public static int t = 0; + /** + * Check that the store usage limit is not greater than max usable + * space and adjust if it is + */ + protected void checkStoreUsageLimits() throws IOException { + final SystemUsage usage = getSystemUsage(); + + if (getPersistenceAdapter() != null) { + PersistenceAdapter adapter = getPersistenceAdapter(); + File dir = adapter.getDirectory(); + + if (dir != null) { + dir = StoreUtil.findParentDirectory(dir); + + int usagePercent = usage.getStoreUsage().getPercentLimit(); + long storeLimit = usage.getStoreUsage().getLimit(); + long storeCurrent = usage.getStoreUsage().getUsage(); + long dirFreeSpace = dir.getUsableSpace(); + + if (diskUsageCheckRegrowPercentChange && usagePercent > 0 && + storeLimit < (dirFreeSpace + storeCurrent)) { + LOG.info("Usable disk space has been increased, attempting to regrow store limit to " + + + usagePercent + "% of the partition size."); + + //trigger the recomputation of the absolute value by + //calling setPercentLimit again + usage.getStoreUsage().setPercentLimit(usagePercent); + storeLimit = usage.getStoreUsage().getLimit(); + storeCurrent = usage.getStoreUsage().getUsage(); + } + + if (storeLimit > (dirFreeSpace + storeCurrent)) { + if (usagePercent > 0) { + LOG.warn("Store limit has been set to " + + usagePercent + "% of the partition size but " + + "there is not enough usable space."); + } + + LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + + " mb (current store usage is " + storeCurrent / (1024 * 1024) + + " mb). The data directory: " + dir.getAbsolutePath() + + " only has " + dirFreeSpace / (1024 * 1024) + + " mb of usable space - resetting to maximum available disk space: " + + (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); + usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); + } + } + + long maxJournalFileSize = 0; + long storeLimit = usage.getStoreUsage().getLimit(); + + if (adapter instanceof JournaledStore) { + maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); + } + + if (storeLimit < maxJournalFileSize) { + LOG.error("Store limit is " + storeLimit / (1024 * 1024) + + " mb, whilst the max journal file size for the store is: " + + maxJournalFileSize / (1024 * 1024) + " mb, " + + "the store will not accept any data when used."); + + } + } + } + + /** + * Check that temporary usage limit is not greater than max usable + * space and adjust if it is + */ + protected void checkTmpStoreUsageLimits() throws IOException { + final SystemUsage usage = getSystemUsage(); + + File tmpDir = getTmpDataDirectory(); + + if (tmpDir != null) { + tmpDir = StoreUtil.findParentDirectory(tmpDir); + + long storeLimit = usage.getTempUsage().getLimit(); +<<<<<<< 4f8d56aaf60f99abe643e79c6c4940a571289a86 + long storeCurrent = usage.getTempUsage().getUsage(); + while (tmpDir != null && !tmpDir.isDirectory()) { + tmpDir = tmpDir.getParentFile(); + } + long dirFreeSpace = tmpDir.getUsableSpace(); + if (storeLimit > (dirFreeSpace + storeCurrent)) { + LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + + " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) + + " mb). The temporary data directory: " + tmpDir.getAbsolutePath() + + " only has " + dirFreeSpace / (1024 * 1024) + + " mb of usable space - resetting to maximum available disk space: " + + (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); + usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent); +======= + long dirFreeSpace = tmpDir.getUsableSpace(); + int usagePercent = usage.getTempUsage().getPercentLimit(); + +// if (diskUsageCheckRegrowPercentChange && usagePercent > 0 && +// storeLimit < (dirFreeSpace + storeCurrent)) { +// LOG.info("Usable disk space has been increased, attempting to regrow store limit to " + +// + usagePercent + "% of the partition size."); +// +// //trigger the recomputation of the absolute value by +// //calling setPercentLimit again +// usage.getStoreUsage().setPercentLimit(usagePercent); +// storeLimit = usage.getStoreUsage().getLimit(); +// storeCurrent = usage.getStoreUsage().getUsage(); +// } + + if (storeLimit > dirFreeSpace) { + if (usagePercent > 0) { + LOG.warn("Temporary Store limit has been set to " + + usagePercent + "% of the partition size but " + + "there is not enough usable space."); + } + + LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + + " mb, whilst the temporary data directory: " + tmpDir.getAbsolutePath() + + " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " + + dirFreeSpace / (1024 * 1024) + " mb."); + usage.getTempUsage().setLimit(dirFreeSpace); +>>>>>>> https://issues.apache.org/jira/browse/AMQ-5965 + } + + if (isPersistent()) { + long maxJournalFileSize; + + PListStore store = usage.getTempUsage().getStore(); + if (store != null && store instanceof JournaledStore) { + maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); + } else { + maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; + } + + if (storeLimit < maxJournalFileSize) { + LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + + " mb, whilst the max journal file size for the temporary store is: " + + maxJournalFileSize / (1024 * 1024) + " mb, " + + "the temp store will not accept any data when used."); + } + } + } + } + + /** + * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to + * update store and temporary store limits if the amount of available space + * plus current store size is less than the existin configured limit + */ + protected void scheduleDiskUsageLimitsCheck() throws IOException { + if (schedulePeriodForDiskUsageCheck > 0 && + (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { + Runnable diskLimitCheckTask = new Runnable() { + @Override + public void run() { + try { + checkStoreUsageLimits(); + } catch (IOException e) { + LOG.error("Failed to check persistent disk usage limits", e); + } + + try { + checkTmpStoreUsageLimits(); + } catch (IOException e) { + LOG.error("Failed to check temporary store usage limits", e); + } + } + }; + scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); + } + } + + protected void checkSystemUsageLimits() throws IOException { + final SystemUsage usage = getSystemUsage(); + long memLimit = usage.getMemoryUsage().getLimit(); + long jvmLimit = Runtime.getRuntime().maxMemory(); + + if (memLimit > jvmLimit) { + usage.getMemoryUsage().setPercentOfJvmHeap(70); + LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + + " mb) is more than the maximum available for the JVM: " + + jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); + } + + //Check the persistent store and temp store limits if they exist + //and schedule a periodic check to update disk limits if + //schedulePeriodForDiskLimitCheck is set + checkStoreUsageLimits(); + checkTmpStoreUsageLimits(); + scheduleDiskUsageLimitsCheck(); + + if (getJobSchedulerStore() != null) { + JobSchedulerStore scheduler = getJobSchedulerStore(); + File schedulerDir = scheduler.getDirectory(); + if (schedulerDir != null) { + + String schedulerDirPath = schedulerDir.getAbsolutePath(); + if (!schedulerDir.isAbsolute()) { + schedulerDir = new File(schedulerDirPath); + } + + while (schedulerDir != null && !schedulerDir.isDirectory()) { + schedulerDir = schedulerDir.getParentFile(); + } + long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); + long dirFreeSpace = schedulerDir.getUsableSpace(); + if (schedulerLimit > dirFreeSpace) { + LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + + " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + + " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + + dirFreeSpace / (1024 * 1024) + " mb."); + usage.getJobSchedulerUsage().setLimit(dirFreeSpace); + } + } + } + } + + public void stopAllConnectors(ServiceStopper stopper) { + for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { + NetworkConnector connector = iter.next(); + unregisterNetworkConnectorMBean(connector); + stopper.stop(connector); + } + for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { + ProxyConnector connector = iter.next(); + stopper.stop(connector); + } + for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { + JmsConnector connector = iter.next(); + stopper.stop(connector); + } + for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { + TransportConnector connector = iter.next(); + try { + unregisterConnectorMBean(connector); + } catch (IOException e) { + } + stopper.stop(connector); + } + } + + protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { + try { + ObjectName objectName = createConnectorObjectName(connector); + connector = connector.asManagedConnector(getManagementContext(), objectName); + ConnectorViewMBean view = new ConnectorView(connector); + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + return connector; + } catch (Throwable e) { + throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); + } + } + + protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { + if (isUseJmx()) { + try { + ObjectName objectName = createConnectorObjectName(connector); + getManagementContext().unregisterMBean(objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create( + "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); + } + } + } + + protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + return adaptor; + } + + protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + if (isUseJmx()) {} + } + + private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { + return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); + } + + public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { + NetworkConnectorViewMBean view = new NetworkConnectorView(connector); + try { + ObjectName objectName = createNetworkConnectorObjectName(connector); + connector.setObjectName(objectName); + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); + } + } + + protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { + return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); + } + + public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { + return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); + } + + protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { + if (isUseJmx()) { + try { + ObjectName objectName = createNetworkConnectorObjectName(connector); + getManagementContext().unregisterMBean(objectName); + } catch (Exception e) { + LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); + } + } + } + + protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { + ProxyConnectorView view = new ProxyConnectorView(connector); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); + } + } + + protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { + JmsConnectorView view = new JmsConnectorView(connector); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); + } + } + + /** + * Factory method to create a new broker + * + * @throws Exception + */ + protected Broker createBroker() throws Exception { + regionBroker = createRegionBroker(); + Broker broker = addInterceptors(regionBroker); + // Add a filter that will stop access to the broker once stopped + broker = new MutableBrokerFilter(broker) { + Broker old; + + @Override + public void stop() throws Exception { + old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { + // Just ignore additional stop actions. + @Override + public void stop() throws Exception { + } + }); + old.stop(); + } + + @Override + public void start() throws Exception { + if (forceStart && old != null) { + this.next.set(old); + } + getNext().start(); + } + }; + return broker; + } + + /** + * Factory method to create the core region broker onto which interceptors + * are added + * + * @throws Exception + */ + protected Broker createRegionBroker() throws Exception { + if (destinationInterceptors == null) { + destinationInterceptors = createDefaultDestinationInterceptor(); + } + configureServices(destinationInterceptors); + DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); + if (destinationFactory == null) { + destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); + } + return createRegionBroker(destinationInterceptor); + } + + protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { + RegionBroker regionBroker; + if (isUseJmx()) { + try { + regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), + getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); + } catch(MalformedObjectNameException me){ + LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); + throw new IOException(me); + } + } else { + regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, + destinationInterceptor,getScheduler(),getExecutor()); + } + destinationFactory.setRegionBroker(regionBroker); + regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); + regionBroker.setBrokerName(getBrokerName()); + regionBroker.getDestinationStatistics().setEnabled(enableStatistics); + regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); + if (brokerId != null) { + regionBroker.setBrokerId(brokerId); + } + return regionBroker; + } + + /** + * Create the default destination interceptor + */ + protected DestinationInterceptor[] createDefaultDestinationInterceptor() { + List answer = new ArrayList(); + if (isUseVirtualTopics()) { + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + VirtualTopic virtualTopic = new VirtualTopic(); + virtualTopic.setName("VirtualTopic.>"); + VirtualDestination[] virtualDestinations = { virtualTopic }; + interceptor.setVirtualDestinations(virtualDestinations); + answer.add(interceptor); + } + if (isUseMirroredQueues()) { + MirroredQueue interceptor = new MirroredQueue(); + answer.add(interceptor); + } + DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; + answer.toArray(array); + return array; + } + + /** + * Strategy method to add interceptors to the broker + * + * @throws IOException + */ + protected Broker addInterceptors(Broker broker) throws Exception { + if (isSchedulerSupport()) { + SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); + if (isUseJmx()) { + JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); + try { + ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + this.adminView.setJMSJobScheduler(objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " + + e.getMessage(), e); + } + } + broker = sb; + } + if (isUseJmx()) { + HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); + try { + ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); + AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " + + e.getMessage(), e); + } + } + if (isAdvisorySupport()) { + broker = new AdvisoryBroker(broker); + } + broker = new CompositeDestinationBroker(broker); + broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); + if (isPopulateJMSXUserID()) { + UserIDBroker userIDBroker = new UserIDBroker(broker); + userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); + broker = userIDBroker; + } + if (isMonitorConnectionSplits()) { + broker = new ConnectionSplitBroker(broker); + } + if (plugins != null) { + for (int i = 0; i < plugins.length; i++) { + BrokerPlugin plugin = plugins[i]; + broker = plugin.installPlugin(broker); + } + } + return broker; + } + + protected PersistenceAdapter createPersistenceAdapter() throws IOException { + if (isPersistent()) { + PersistenceAdapterFactory fac = getPersistenceFactory(); + if (fac != null) { + return fac.createPersistenceAdapter(); + } else { + try { + String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; + PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); + File dir = new File(getBrokerDataDirectory(),"KahaDB"); + adaptor.setDirectory(dir); + return adaptor; + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + } else { + return new MemoryPersistenceAdapter(); + } + } + + protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { + return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); + } + + protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { + TransportServer transport = TransportFactorySupport.bind(this, brokerURI); + return new TransportConnector(transport); + } + + /** + * Extracts the port from the options + */ + protected Object getPort(Map options) { + Object port = options.get("port"); + if (port == null) { + port = DEFAULT_PORT; + LOG.warn("No port specified so defaulting to: {}", port); + } + return port; + } + + protected void addShutdownHook() { + if (useShutdownHook) { + shutdownHook = new Thread("ActiveMQ ShutdownHook") { + @Override + public void run() { + containerShutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + } + + protected void removeShutdownHook() { + if (shutdownHook != null) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } catch (Exception e) { + LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); + } + } + } + + /** + * Sets hooks to be executed when broker shut down + * + * @org.apache.xbean.Property + */ + public void setShutdownHooks(List hooks) throws Exception { + for (Runnable hook : hooks) { + addShutdownHook(hook); + } + } + + /** + * Causes a clean shutdown of the container when the VM is being shut down + */ + protected void containerShutdown() { + try { + stop(); + } catch (IOException e) { + Throwable linkedException = e.getCause(); + if (linkedException != null) { + logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); + } else { + logError("Failed to shut down: " + e, e); + } + if (!useLoggingForShutdownErrors) { + e.printStackTrace(System.err); + } + } catch (Exception e) { + logError("Failed to shut down: " + e, e); + } + } + + protected void logError(String message, Throwable e) { + if (useLoggingForShutdownErrors) { + LOG.error("Failed to shut down: " + e); + } else { + System.err.println("Failed to shut down: " + e); + } + } + + /** + * Starts any configured destinations on startup + */ + protected void startDestinations() throws Exception { + if (destinations != null) { + ConnectionContext adminConnectionContext = getAdminConnectionContext(); + for (int i = 0; i < destinations.length; i++) { + ActiveMQDestination destination = destinations[i]; + getBroker().addDestination(adminConnectionContext, destination,true); + } + } + if (isUseVirtualTopics()) { + startVirtualConsumerDestinations(); + } + } + + /** + * Returns the broker's administration connection context used for + * configuring the broker at startup + */ + public ConnectionContext getAdminConnectionContext() throws Exception { + return BrokerSupport.getConnectionContext(getBroker()); + } + + protected void startManagementContext() throws Exception { + getManagementContext().setBrokerName(brokerName); + getManagementContext().start(); + adminView = new BrokerView(this, null); + ObjectName objectName = getBrokerObjectName(); + AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); + } + + /** + * Start all transport and network connections, proxies and bridges + * + * @throws Exception + */ + public void startAllConnectors() throws Exception { + Set durableDestinations = getBroker().getDurableDestinations(); + List al = new ArrayList(); + for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { + TransportConnector connector = iter.next(); + al.add(startTransportConnector(connector)); + } + if (al.size() > 0) { + // let's clear the transportConnectors list and replace it with + // the started transportConnector instances + this.transportConnectors.clear(); + setTransportConnectors(al); + } + this.slave = false; + URI uri = getVmConnectorURI(); + Map map = new HashMap(URISupport.parseParameters(uri)); + map.put("async", "false"); + uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); + + if (!stopped.get()) { + ThreadPoolExecutor networkConnectorStartExecutor = null; + if (isNetworkConnectorStartAsync()) { + // spin up as many threads as needed + networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 10, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactory() { + int count=0; + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); + thread.setDaemon(true); + return thread; + } + }); + } + + for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { + final NetworkConnector connector = iter.next(); + connector.setLocalUri(uri); + connector.setBrokerName(getBrokerName()); + connector.setDurableDestinations(durableDestinations); + if (getDefaultSocketURIString() != null) { + connector.setBrokerURL(getDefaultSocketURIString()); + } + if (networkConnectorStartExecutor != null) { + networkConnectorStartExecutor.execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Async start of {}", connector); + connector.start(); + } catch(Exception e) { + LOG.error("Async start of network connector: {} failed", connector, e); + } + } + }); + } else { + connector.start(); + } + } + if (networkConnectorStartExecutor != null) { + // executor done when enqueued tasks are complete + ThreadPoolUtils.shutdown(networkConnectorStartExecutor); + } + + for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { + ProxyConnector connector = iter.next(); + connector.start(); + } + for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { + JmsConnector connector = iter.next(); + connector.start(); + } + for (Service service : services) { + configureService(service); + service.start(); + } + } + } + + public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { + connector.setBrokerService(this); + connector.setTaskRunnerFactory(getTaskRunnerFactory()); + MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); + if (policy != null) { + connector.setMessageAuthorizationPolicy(policy); + } + if (isUseJmx()) { + connector = registerConnectorMBean(connector); + } + connector.getStatistics().setEnabled(enableStatistics); + connector.start(); + return connector; + } + + /** + * Perform any custom dependency injection + */ + protected void configureServices(Object[] services) { + for (Object service : services) { + configureService(service); + } + } + + /** + * Perform any custom dependency injection + */ + protected void configureService(Object service) { + if (service instanceof BrokerServiceAware) { + BrokerServiceAware serviceAware = (BrokerServiceAware) service; + serviceAware.setBrokerService(this); + } + } + + public void handleIOException(IOException exception) { + if (ioExceptionHandler != null) { + ioExceptionHandler.handle(exception); + } else { + LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); + } + } + + protected void startVirtualConsumerDestinations() throws Exception { + ConnectionContext adminConnectionContext = getAdminConnectionContext(); + Set destinations = destinationFactory.getDestinations(); + DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); + if (!destinations.isEmpty()) { + for (ActiveMQDestination destination : destinations) { + if (filter.matches(destination) == true) { + broker.addDestination(adminConnectionContext, destination, false); + } + } + } + } + + private DestinationFilter getVirtualTopicConsumerDestinationFilter() { + // created at startup, so no sync needed + if (virtualConsumerDestinationFilter == null) { + Set consumerDestinations = new HashSet(); + if (destinationInterceptors != null) { + for (DestinationInterceptor interceptor : destinationInterceptors) { + if (interceptor instanceof VirtualDestinationInterceptor) { + VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; + for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { + if (virtualDestination instanceof VirtualTopic) { + consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); + } + } + } + } + } + ActiveMQQueue filter = new ActiveMQQueue(); + filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); + virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); + } + return virtualConsumerDestinationFilter; + } + + protected synchronized ThreadPoolExecutor getExecutor() { + if (this.executor == null) { + this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + + private long i = 0; + + @Override + public Thread newThread(Runnable runnable) { + this.i++; + Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Error in thread '{}'", t.getName(), e); + } + }); + return thread; + } + }, new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().offer(r, 60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); + } + + throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); + } + }); + } + return this.executor; + } + + public synchronized Scheduler getScheduler() { + if (this.scheduler==null) { + this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); + try { + this.scheduler.start(); + } catch (Exception e) { + LOG.error("Failed to start Scheduler", e); + } + } + return this.scheduler; + } + + public Broker getRegionBroker() { + return regionBroker; + } + + public void setRegionBroker(Broker regionBroker) { + this.regionBroker = regionBroker; + } + + public void addShutdownHook(Runnable hook) { + synchronized (shutdownHooks) { + shutdownHooks.add(hook); + } + } + + public void removeShutdownHook(Runnable hook) { + synchronized (shutdownHooks) { + shutdownHooks.remove(hook); + } + } + + public boolean isSystemExitOnShutdown() { + return systemExitOnShutdown; + } + + /** + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { + this.systemExitOnShutdown = systemExitOnShutdown; + } + + public int getSystemExitOnShutdownExitCode() { + return systemExitOnShutdownExitCode; + } + + public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { + this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; + } + + public SslContext getSslContext() { + return sslContext; + } + + public void setSslContext(SslContext sslContext) { + this.sslContext = sslContext; + } + + public boolean isShutdownOnSlaveFailure() { + return shutdownOnSlaveFailure; + } + + /** + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { + this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; + } + + public boolean isWaitForSlave() { + return waitForSlave; + } + + /** + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setWaitForSlave(boolean waitForSlave) { + this.waitForSlave = waitForSlave; + } + + public long getWaitForSlaveTimeout() { + return this.waitForSlaveTimeout; + } + + public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { + this.waitForSlaveTimeout = waitForSlaveTimeout; + } + + /** + * Get the passiveSlave + * @return the passiveSlave + */ + public boolean isPassiveSlave() { + return this.passiveSlave; + } + + /** + * Set the passiveSlave + * @param passiveSlave the passiveSlave to set + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setPassiveSlave(boolean passiveSlave) { + this.passiveSlave = passiveSlave; + } + + /** + * override the Default IOException handler, called when persistence adapter + * has experiences File or JDBC I/O Exceptions + * + * @param ioExceptionHandler + */ + public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { + configureService(ioExceptionHandler); + this.ioExceptionHandler = ioExceptionHandler; + } + + public IOExceptionHandler getIoExceptionHandler() { + return ioExceptionHandler; + } + + /** + * @return the schedulerSupport + */ + public boolean isSchedulerSupport() { + return this.schedulerSupport; + } + + /** + * @param schedulerSupport the schedulerSupport to set + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setSchedulerSupport(boolean schedulerSupport) { + this.schedulerSupport = schedulerSupport; + } + + /** + * @return the schedulerDirectory + */ + public File getSchedulerDirectoryFile() { + if (this.schedulerDirectoryFile == null) { + this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); + } + return schedulerDirectoryFile; + } + + /** + * @param schedulerDirectory the schedulerDirectory to set + */ + public void setSchedulerDirectoryFile(File schedulerDirectory) { + this.schedulerDirectoryFile = schedulerDirectory; + } + + public void setSchedulerDirectory(String schedulerDirectory) { + setSchedulerDirectoryFile(new File(schedulerDirectory)); + } + + public int getSchedulePeriodForDestinationPurge() { + return this.schedulePeriodForDestinationPurge; + } + + public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { + this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; + } + + public void setSchedulePeriodForDiskUsageCheck( + int schedulePeriodForDiskUsageCheck) { + this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; + } + + public int getMaxPurgedDestinationsPerSweep() { + return this.maxPurgedDestinationsPerSweep; + } + + public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { + this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; + } + + public BrokerContext getBrokerContext() { + return brokerContext; + } + + public void setBrokerContext(BrokerContext brokerContext) { + this.brokerContext = brokerContext; + } + + public void setBrokerId(String brokerId) { + this.brokerId = new BrokerId(brokerId); + } + + public boolean isUseAuthenticatedPrincipalForJMSXUserID() { + return useAuthenticatedPrincipalForJMSXUserID; + } + + public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { + this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; + } + + /** + * Should MBeans that support showing the Authenticated User Name information have this + * value filled in or not. + * + * @return true if user names should be exposed in MBeans + */ + public boolean isPopulateUserNameInMBeans() { + return this.populateUserNameInMBeans; + } + + /** + * Sets whether Authenticated User Name information is shown in MBeans that support this field. + * @param value if MBeans should expose user name information. + */ + public void setPopulateUserNameInMBeans(boolean value) { + this.populateUserNameInMBeans = value; + } + + /** + * Gets the time in Milliseconds that an invocation of an MBean method will wait before + * failing. The default value is to wait forever (zero). + * + * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). + */ + public long getMbeanInvocationTimeout() { + return mbeanInvocationTimeout; + } + + /** + * Gets the time in Milliseconds that an invocation of an MBean method will wait before + * failing. The default value is to wait forever (zero). + * + * @param mbeanInvocationTimeout + * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). + */ + public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { + this.mbeanInvocationTimeout = mbeanInvocationTimeout; + } + + public boolean isNetworkConnectorStartAsync() { + return networkConnectorStartAsync; + } + + public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { + this.networkConnectorStartAsync = networkConnectorStartAsync; + } + + public boolean isAllowTempAutoCreationOnSend() { + return allowTempAutoCreationOnSend; + } + + /** + * enable if temp destinations need to be propagated through a network when + * advisorySupport==false. This is used in conjunction with the policy + * gcInactiveDestinations for matching temps so they can get removed + * when inactive + * + * @param allowTempAutoCreationOnSend + */ + public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { + this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; + } + + public long getOfflineDurableSubscriberTimeout() { + return offlineDurableSubscriberTimeout; + } + + public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { + this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; + } + + public long getOfflineDurableSubscriberTaskSchedule() { + return offlineDurableSubscriberTaskSchedule; + } + + public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { + this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; + } + + public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { + return isUseVirtualTopics() && destination.isQueue() && + getVirtualTopicConsumerDestinationFilter().matches(destination); + } + + public Throwable getStartException() { + return startException; + } + + public boolean isStartAsync() { + return startAsync; + } + + public void setStartAsync(boolean startAsync) { + this.startAsync = startAsync; + } + + public boolean isSlave() { + return this.slave; + } + + public boolean isStopping() { + return this.stopping.get(); + } + + /** + * @return true if the broker allowed to restart on shutdown. + */ + public boolean isRestartAllowed() { + return restartAllowed; + } + + /** + * Sets if the broker allowed to restart on shutdown. + */ + public void setRestartAllowed(boolean restartAllowed) { + this.restartAllowed = restartAllowed; + } + + /** + * A lifecycle manager of the BrokerService should + * inspect this property after a broker shutdown has occurred + * to find out if the broker needs to be re-created and started + * again. + * + * @return true if the broker wants to be restarted after it shuts down. + */ + public boolean isRestartRequested() { + return restartRequested; + } + + public void requestRestart() { + this.restartRequested = true; + } + + public int getStoreOpenWireVersion() { + return storeOpenWireVersion; + } + + public void setStoreOpenWireVersion(int storeOpenWireVersion) { + this.storeOpenWireVersion = storeOpenWireVersion; + } + + /** + * @return the current number of connections on this Broker. + */ + public int getCurrentConnections() { + return this.currentConnections.get(); + } + + /** + * @return the total number of connections this broker has handled since startup. + */ + public long getTotalConnections() { + return this.totalConnections.get(); + } + + public void incrementCurrentConnections() { + this.currentConnections.incrementAndGet(); + } + + public void decrementCurrentConnections() { + this.currentConnections.decrementAndGet(); + } + + public void incrementTotalConnections() { + this.totalConnections.incrementAndGet(); + } + + public boolean isRejectDurableConsumers() { + return rejectDurableConsumers; + } + + public void setRejectDurableConsumers(boolean rejectDurableConsumers) { + this.rejectDurableConsumers = rejectDurableConsumers; + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java new file mode 100644 index 0000000000..c08fa00e43 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usage; + + +public abstract class PercentLimitUsage extends Usage { + + protected int percentLimit = 0; + + /** + * @param parent + * @param name + * @param portion + */ + public PercentLimitUsage(T parent, String name, float portion) { + super(parent, name, portion); + } + + public void setPercentLimit(int percentLimit) { + usageLock.writeLock().lock(); + try { + this.percentLimit = percentLimit; + updateLimitBasedOnPercent(); + } finally { + usageLock.writeLock().unlock(); + } + } + + public int getPercentLimit() { + usageLock.readLock().lock(); + try { + return percentLimit; + } finally { + usageLock.readLock().unlock(); + } + } + + protected abstract void updateLimitBasedOnPercent(); +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java index b8083888f0..4646551f89 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.usage; +import java.io.File; + import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.StoreUtil; /** * Used to keep track of how much of something is being used so that a @@ -26,7 +29,7 @@ import org.apache.activemq.store.PersistenceAdapter; * @org.apache.xbean.XBean * */ -public class StoreUsage extends Usage { +public class StoreUsage extends PercentLimitUsage { private PersistenceAdapter store; @@ -37,11 +40,13 @@ public class StoreUsage extends Usage { public StoreUsage(String name, PersistenceAdapter store) { super(null, name, 1.0f); this.store = store; + updateLimitBasedOnPercent(); } public StoreUsage(StoreUsage parent, String name) { super(parent, name, 1.0f); this.store = parent.store; + updateLimitBasedOnPercent(); } @Override @@ -57,7 +62,12 @@ public class StoreUsage extends Usage { public void setStore(PersistenceAdapter store) { this.store = store; - onLimitChange(); + if (percentLimit > 0 && store != null) { + //will trigger onLimitChange + updateLimitBasedOnPercent(); + } else { + onLimitChange(); + } } @Override @@ -81,4 +91,21 @@ public class StoreUsage extends Usage { return super.waitForSpace(timeout, highWaterMark); } + + @Override + protected void updateLimitBasedOnPercent() { + usageLock.writeLock().lock(); + try { + + if (percentLimit > 0 && store != null) { + File dir = StoreUtil.findParentDirectory(store.getDirectory()); + + if (dir != null) { + this.setLimit(dir.getTotalSpace() * percentLimit / 100); + } + } + } finally { + usageLock.writeLock().unlock(); + } + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java index 7ce91b391b..f068dbe854 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java @@ -17,17 +17,20 @@ package org.apache.activemq.usage; +import java.io.File; + import org.apache.activemq.store.PListStore; +import org.apache.activemq.util.StoreUtil; /** * Used to keep track of how much of something is being used so that a * productive working set usage can be controlled. Main use case is manage * memory usage. - * + * * @org.apache.xbean.XBean - * + * */ -public class TempUsage extends Usage { +public class TempUsage extends PercentLimitUsage { private PListStore store; @@ -38,11 +41,13 @@ public class TempUsage extends Usage { public TempUsage(String name, PListStore store) { super(null, name, 1.0f); this.store = store; + updateLimitBasedOnPercent(); } public TempUsage(TempUsage parent, String name) { super(parent, name, 1.0f); this.store = parent.store; + updateLimitBasedOnPercent(); } @Override @@ -59,6 +64,27 @@ public class TempUsage extends Usage { public void setStore(PListStore store) { this.store = store; - onLimitChange(); + if (percentLimit > 0 && store != null) { + //will trigger onLimitChange + updateLimitBasedOnPercent(); + } else { + onLimitChange(); + } + } + + @Override + protected void updateLimitBasedOnPercent() { + usageLock.writeLock().lock(); + try { + if (percentLimit > 0 && store != null) { + File dir = StoreUtil.findParentDirectory(store.getDirectory()); + + if (dir != null) { + this.setLimit(dir.getTotalSpace() * percentLimit / 100); + } + } + } finally { + usageLock.writeLock().unlock(); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java b/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java new file mode 100644 index 0000000000..77aafa044f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.File; + +public class StoreUtil { + + /** + * Utility method to help find the root directory of the store + * + * @param dir + * @return + */ + public static File findParentDirectory(File dir) { + if (dir != null) { + String dirPath = dir.getAbsolutePath(); + if (!dir.isAbsolute()) { + dir = new File(dirPath); + } + + while (dir != null && !dir.isDirectory()) { + dir = dir.getParentFile(); + } + } + return dir; + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java new file mode 100644 index 0000000000..5eb830cf5f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usage; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.StoreUtil; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * This test is for AMQ-5393 and will check that schedulePeriodForDiskLimitCheck + * properly schedules a task that will update disk limits if the amount of usable disk space drops + * because another process uses up disk space. + * + */ +public class PercentDiskUsageLimitTest { + protected static final Logger LOG = LoggerFactory + .getLogger(PercentDiskUsageLimitTest.class); + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + private BrokerService broker; + private PersistenceAdapter adapter; + private TempUsage tempUsage; + private StoreUsage storeUsage; + private File storeDir; + + @Before + public void setUpBroker() throws Exception { + broker = new BrokerService(); + broker.setPersistent(true); + broker.setDataDirectoryFile(dataFileDir.getRoot()); + broker.setDeleteAllMessagesOnStartup(true); + adapter = broker.getPersistenceAdapter(); + + FileUtils.forceMkdir(adapter.getDirectory()); + FileUtils.forceMkdir(broker.getTempDataStore().getDirectory()); + storeDir = StoreUtil.findParentDirectory(adapter.getDirectory()); + + final SystemUsage systemUsage = broker.getSystemUsage(); + tempUsage = systemUsage.getTempUsage(); + storeUsage = systemUsage.getStoreUsage(); + } + + protected void startBroker() throws Exception { + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + + /** + * + */ + @Test(timeout=30000) + public void testDiskLimit() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent >= 2) { + int maxUsage = freePercent / 2; + + //Set max usage to less than free space so we know that all space can be allocated + storeUsage.setPercentLimit(maxUsage); + tempUsage.setPercentLimit(maxUsage); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the max usage percent * total available space + //within 1 mb + assertEquals(diskLimit, storeDir.getTotalSpace() * maxUsage / 100, 1000000); + } + } + + @Test(timeout=30000) + public void testDiskLimitOverMaxFree() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent > 1) { + storeUsage.setPercentLimit(freePercent + 1); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the usable space + //within 1 mb + assertEquals(diskLimit, storeDir.getUsableSpace(), 1000000); + } + } + + @Test(timeout=30000) + public void testDiskLimitOver100Percent() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent > 1) { + storeUsage.setPercentLimit(110); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the available space + //within 1 mb + assertEquals(diskLimit, storeDir.getUsableSpace(), 1000000); + } + } + + protected int getFreePercentage() { + File storeDir = StoreUtil.findParentDirectory(adapter.getDirectory()); + return (int) (((double)storeDir.getUsableSpace() / storeDir.getTotalSpace()) * 100); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java index bd3687b6f0..48c6c4b96d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java @@ -22,19 +22,21 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.util.Random; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.usage.StoreUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.StoreUtil; import org.apache.activemq.util.Wait; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,21 +51,31 @@ public class PeriodicDiskUsageLimitTest { protected static final Logger LOG = LoggerFactory .getLogger(PeriodicDiskUsageLimitTest.class); - File dataFileDir = new File("target/test-amq-5393/datadb"); - File testfile = new File("target/test-amq-5393/testfile"); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + File testfile; private BrokerService broker; private PersistenceAdapter adapter; private TempUsage tempUsage; private StoreUsage storeUsage; + protected URI brokerConnectURI; @Before public void setUpBroker() throws Exception { broker = new BrokerService(); broker.setPersistent(true); - broker.setDataDirectoryFile(dataFileDir); + testfile = dataFileDir.newFile(); + broker.setDataDirectoryFile(dataFileDir.getRoot()); broker.setDeleteAllMessagesOnStartup(true); adapter = broker.getPersistenceAdapter(); + TransportConnector connector = broker + .addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:8000")); + connector.setName("tcp"); + + brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri(); + FileUtils.deleteQuietly(testfile); FileUtils.forceMkdir(adapter.getDirectory()); FileUtils.forceMkdir(broker.getTempDataStore().getDirectory()); @@ -82,8 +94,6 @@ public class PeriodicDiskUsageLimitTest { public void stopBroker() throws Exception { broker.stop(); broker.waitUntilStopped(); - FileUtils.deleteQuietly(testfile); - FileUtils.deleteQuietly(dataFileDir); } /** @@ -102,8 +112,8 @@ public class PeriodicDiskUsageLimitTest { final long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); final long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); - //write a 1 meg file to the file system - writeTestFile(1024 * 1024); + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); //Assert that the usage limits have been decreased because some free space was used //up by a file @@ -122,6 +132,67 @@ public class PeriodicDiskUsageLimitTest { })); } + /** + * This test will show that if a file is written to take away free space, and + * if the usage limit is now less than the store size plus remaining free space, then + * the usage limits will adjust lower. Then test that size regrows when file is deleted. + */ + @Test(timeout=30000) + public void testDiskUsageAdjustLowerAndHigherUsingPercent() throws Exception { + //set the limit to max space so that if a file is added to eat up free space then + //the broker should adjust the usage limit..add 5% above free space + tempUsage.setPercentLimit(getFreePercentage(broker.getTempDataStore().getDirectory()) + 5); + storeUsage.setPercentLimit(getFreePercentage(adapter.getDirectory()) + 5); + + //set threshold to 1 megabyte + broker.setDiskUsageCheckRegrowThreshold(1024 * 1024); + broker.setSchedulePeriodForDiskUsageCheck(4000); + startBroker(); + + final long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + final long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + + //Assert that the usage limits have been decreased because some free space was used + //up by a file + assertTrue("Store Usage should ramp down.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getStoreUsage().getLimit() < originalDisk; + } + })); + + assertTrue("Temp Usage should ramp down.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getTempUsage().getLimit() < originalTmp; + } + })); + + //get the limits and then delete the test file to free up space + final long storeLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + final long tmpLimit = broker.getSystemUsage().getTempUsage().getLimit(); + FileUtils.deleteQuietly(testfile); + + //regrow + assertTrue("Store Usage should ramp up.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getStoreUsage().getLimit() > storeLimit; + } + })); + + //regrow + assertTrue("Temp Usage should ramp up.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getTempUsage().getLimit() > tmpLimit; + } + })); + } + /** * This test shows that the usage limits will not change if the * schedulePeriodForDiskLimitCheck property is not set because no task will run @@ -143,6 +214,28 @@ public class PeriodicDiskUsageLimitTest { assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); } + /** + * This test shows that the usage limits will not change if the + * schedulePeriodForDiskLimitCheck property is not set because no task will run + */ + @Test(timeout=30000) + public void testDiskLimitCheckNotSetUsingPercent() throws Exception { + tempUsage.setPercentLimit(getFreePercentage(broker.getTempDataStore().getDirectory()) + 5); + storeUsage.setPercentLimit(getFreePercentage(adapter.getDirectory()) + 5); + startBroker(); + + long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + Thread.sleep(3000); + + //assert that the usage limits have not changed because a task should not have run + assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); + } + /** * This test will show that if a file is written to take away free space, but * if the limit is greater than the store size and the remaining free space, then @@ -169,6 +262,38 @@ public class PeriodicDiskUsageLimitTest { assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); } + /** + * This test will show that if a file is written to take away free space, but + * if the limit is greater than the store size and the remaining free space, then + * the usage limits will not adjust. + */ + @Test(timeout=30000) + public void testDiskUsageStaySameUsingPercent() throws Exception { + //set a limit lower than max available space and set the period to 5 seconds + //only run if at least 4 percent disk space free + int tempFreePercent = getFreePercentage(broker.getTempDataStore().getDirectory()); + int freePercent = getFreePercentage(adapter.getDirectory()); + if (freePercent >= 4 && tempFreePercent >= 4) { + tempUsage.setPercentLimit(freePercent / 2); + storeUsage.setPercentLimit(tempFreePercent / 2); + + broker.setSchedulePeriodForDiskUsageCheck(2000); + startBroker(); + + long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + Thread.sleep(5000); + + //Assert that the usage limits have not changed because writing a 2 meg file + //did not decrease the the free space below the already set limit + assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); + } + } + protected void setLimitMaxSpace() { //Configure store limits to be the max usable space on startup tempUsage.setLimit(broker.getTempDataStore().getDirectory().getUsableSpace()); @@ -179,7 +304,14 @@ public class PeriodicDiskUsageLimitTest { final byte[] data = new byte[size]; final Random rng = new Random(); rng.nextBytes(data); - IOUtils.write(data, new FileOutputStream(testfile)); + try(FileOutputStream stream = new FileOutputStream(testfile)) { + IOUtils.write(data, stream); + } + } + + protected int getFreePercentage(File directory) { + File storeDir = StoreUtil.findParentDirectory(directory); + return (int) (((double)storeDir.getUsableSpace() / storeDir.getTotalSpace()) * 100); } }