From 3fe77a2cf8de036cdfcdf4c0fc6889e71265ac88 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 4 Sep 2006 15:40:53 +0000 Subject: [PATCH] Use a different TaskFactory for PersistenceAdaptors - allowing the thread priority to be set differently for PersistenceAdaptors (e.g. Journal) than normal tasks. This is part of the work necessary for http://issues.apache.org/activemq/browse/AMQ-845 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@440108 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 31 ++++++++++++++++++- .../DefaultPersistenceAdapterFactory.java | 13 +++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 20c45b4b40..bc5e1202d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -109,6 +109,7 @@ public class BrokerService implements Service, Serializable { private ManagementContext managementContext; private ObjectName brokerObjectName; private TaskRunnerFactory taskRunnerFactory; + private TaskRunnerFactory persistenceTaskRunnerFactory; private UsageManager memoryManager; private PersistenceAdapter persistenceAdapter; private PersistenceAdapterFactory persistenceFactory; @@ -139,6 +140,8 @@ public class BrokerService implements Service, Serializable { private DestinationInterceptor[] destinationInterceptors; private ActiveMQDestination[] destinations; private Store tempDataStore; + private int persistenceThreadPriority = Thread.MAX_PRIORITY; + /** * Adds a new transport connector for the given bind address @@ -617,6 +620,19 @@ public class BrokerService implements Service, Serializable { public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { this.taskRunnerFactory = taskRunnerFactory; } + + + public TaskRunnerFactory getPersistenceTaskRunnerFactory(){ + if (taskRunnerFactory == null) { + persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000); + } + return persistenceTaskRunnerFactory; + } + + + public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory){ + this.persistenceTaskRunnerFactory=persistenceTaskRunnerFactory; + } public boolean isUseJmx() { return useJmx; @@ -947,6 +963,14 @@ public class BrokerService implements Service, Serializable { public void setTempDataStore(Store tempDataStore){ this.tempDataStore=tempDataStore; } + + public int getPersistenceThreadPriority(){ + return persistenceThreadPriority; + } + + public void setPersistenceThreadPriority(int persistenceThreadPriority){ + this.persistenceThreadPriority=persistenceThreadPriority; + } // Implementation methods // ------------------------------------------------------------------------- @@ -1223,7 +1247,7 @@ public class BrokerService implements Service, Serializable { protected DefaultPersistenceAdapterFactory createPersistenceFactory() { DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory(); factory.setDataDirectoryFile(getDataDirectory()); - factory.setTaskRunnerFactory(getTaskRunnerFactory()); + factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory()); return factory; } @@ -1428,5 +1452,10 @@ public class BrokerService implements Service, Serializable { } } + + + + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java index 9cbacf72e2..fe1ad24c2c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java @@ -53,6 +53,7 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen private boolean useQuickJournal=false; private File journalArchiveDirectory; private boolean failIfJournalIsLocked=false; + private int journalThreadPriority = Thread.MAX_PRIORITY; private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); public PersistenceAdapter createPersistenceAdapter() throws IOException { @@ -107,7 +108,7 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen public TaskRunnerFactory getTaskRunnerFactory() { if( taskRunnerFactory == null ) { - taskRunnerFactory = new TaskRunnerFactory(); + taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000); } return taskRunnerFactory; } @@ -177,6 +178,14 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen public void setCreateTablesOnStartup(boolean createTablesOnStartup) { jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); } + + public int getJournalThreadPriority(){ + return journalThreadPriority; + } + + public void setJournalThreadPriority(int journalThreadPriority){ + this.journalThreadPriority=journalThreadPriority; + } /** * @throws IOException @@ -201,4 +210,6 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen } } + + }