mirror of https://github.com/apache/activemq.git
an implementation of an exclusive lock in SQL to ensure that only one JDBC message store is run against a database at once to fix AMQ-831. For documentation on this feature see: http://activemq.org/site/jdbc-master-slave.html
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@424328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e2f0ef3c5e
commit
90d0c4b38b
|
@ -558,6 +558,7 @@ public class BrokerService implements Service, Serializable {
|
||||||
public PersistenceAdapter getPersistenceAdapter() throws IOException {
|
public PersistenceAdapter getPersistenceAdapter() throws IOException {
|
||||||
if (persistenceAdapter == null) {
|
if (persistenceAdapter == null) {
|
||||||
persistenceAdapter = createPersistenceAdapter();
|
persistenceAdapter = createPersistenceAdapter();
|
||||||
|
configureService(persistenceAdapter);
|
||||||
}
|
}
|
||||||
return persistenceAdapter;
|
return persistenceAdapter;
|
||||||
}
|
}
|
||||||
|
@ -771,6 +772,54 @@ public class BrokerService implements Service, Serializable {
|
||||||
getPersistenceAdapter().deleteAllMessages();
|
getPersistenceAdapter().deleteAllMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isDeleteAllMessagesOnStartup() {
|
||||||
|
return deleteAllMessagesOnStartup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets whether or not all messages are deleted on startup - mostly only
|
||||||
|
* useful for testing.
|
||||||
|
*/
|
||||||
|
public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
|
||||||
|
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getVmConnectorURI() {
|
||||||
|
if (vmConnectorURI == null) {
|
||||||
|
try {
|
||||||
|
vmConnectorURI = new URI("vm://" + getBrokerName());
|
||||||
|
}
|
||||||
|
catch (URISyntaxException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return vmConnectorURI;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVmConnectorURI(URI vmConnectorURI) {
|
||||||
|
this.vmConnectorURI = vmConnectorURI;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns the shutdownOnMasterFailure.
|
||||||
|
*/
|
||||||
|
public boolean isShutdownOnMasterFailure(){
|
||||||
|
return shutdownOnMasterFailure;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
|
||||||
|
*/
|
||||||
|
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
|
||||||
|
this.shutdownOnMasterFailure=shutdownOnMasterFailure;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isKeepDurableSubsActive() {
|
||||||
|
return keepDurableSubsActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
|
||||||
|
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||||
|
}
|
||||||
// Implementation methods
|
// Implementation methods
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
/**
|
/**
|
||||||
|
@ -1132,52 +1181,13 @@ public class BrokerService implements Service, Serializable {
|
||||||
connector.start();
|
connector.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDeleteAllMessagesOnStartup() {
|
|
||||||
return deleteAllMessagesOnStartup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets whether or not all messages are deleted on startup - mostly only
|
* Perform any custom dependency injection
|
||||||
* useful for testing.
|
|
||||||
*/
|
*/
|
||||||
public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
|
protected void configureService(Object service) {
|
||||||
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
|
if (service instanceof BrokerServiceAware) {
|
||||||
}
|
BrokerServiceAware serviceAware = (BrokerServiceAware) service;
|
||||||
|
serviceAware.setBrokerService(this);
|
||||||
public URI getVmConnectorURI() {
|
|
||||||
if (vmConnectorURI == null) {
|
|
||||||
try {
|
|
||||||
vmConnectorURI = new URI("vm://" + getBrokerName());
|
|
||||||
}
|
|
||||||
catch (URISyntaxException e) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return vmConnectorURI;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setVmConnectorURI(URI vmConnectorURI) {
|
|
||||||
this.vmConnectorURI = vmConnectorURI;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Returns the shutdownOnMasterFailure.
|
|
||||||
*/
|
|
||||||
public boolean isShutdownOnMasterFailure(){
|
|
||||||
return shutdownOnMasterFailure;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
|
|
||||||
*/
|
|
||||||
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
|
|
||||||
this.shutdownOnMasterFailure=shutdownOnMasterFailure;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isKeepDurableSubsActive() {
|
|
||||||
return keepDurableSubsActive;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
|
|
||||||
this.keepDurableSubsActive = keepDurableSubsActive;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface used to represent a component that wants the {@link BrokerService}
|
||||||
|
* to be injected
|
||||||
|
*
|
||||||
|
* @version $Revision: $
|
||||||
|
*/
|
||||||
|
public interface BrokerServiceAware {
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService);
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
|
import org.apache.activemq.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents some kind of lock service to ensure that a broker is the only master
|
||||||
|
*
|
||||||
|
* @version $Revision: $
|
||||||
|
*/
|
||||||
|
public interface DatabaseLocker extends Service {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by a timer to keep alive the lock.
|
||||||
|
* If the method returns false the broker should be terminated
|
||||||
|
*/
|
||||||
|
public boolean keepAlive();
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
|
import org.apache.activemq.Service;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an exclusive lock on a database to avoid multiple brokers
|
||||||
|
* running against the same logical database.
|
||||||
|
*
|
||||||
|
* @version $Revision: $
|
||||||
|
*/
|
||||||
|
public class DefaultDatabaseLocker implements DatabaseLocker {
|
||||||
|
private static final Log log = LogFactory.getLog(DefaultDatabaseLocker.class);
|
||||||
|
|
||||||
|
private final DataSource dataSource;
|
||||||
|
private final Statements statements;
|
||||||
|
private long sleepTime = 1000;
|
||||||
|
private Connection connection;
|
||||||
|
|
||||||
|
public DefaultDatabaseLocker(DataSource dataSource, Statements statements) {
|
||||||
|
this.dataSource = dataSource;
|
||||||
|
this.statements = statements;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
log.debug("Attempting to acquire exclusive lock on the database");
|
||||||
|
|
||||||
|
connection = dataSource.getConnection();
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
|
||||||
|
PreparedStatement statement = connection.prepareStatement(statements.getLockCreateStatement());
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
boolean answer = statement.execute();
|
||||||
|
if (answer) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to acquire lock: " + e, e);
|
||||||
|
}
|
||||||
|
log.info("Sleeping for " + sleepTime + " milli(s) before trying again to get the lock...");
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Becoming the master on dataSource: " + dataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean keepAlive() {
|
||||||
|
try {
|
||||||
|
PreparedStatement statement = connection.prepareStatement(statements.getLockUpdateStatement());
|
||||||
|
statement.setLong(1, System.currentTimeMillis());
|
||||||
|
int rows = statement.executeUpdate();
|
||||||
|
if (rows == 1) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to update database lock: " + e, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,15 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.store.jdbc;
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
import java.io.IOException;
|
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
|
||||||
import java.sql.SQLException;
|
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.Collections;
|
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
|
||||||
import java.util.Set;
|
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
|
||||||
|
|
||||||
import org.apache.activeio.command.WireFormat;
|
import org.apache.activeio.command.WireFormat;
|
||||||
import org.apache.activeio.util.FactoryFinder;
|
import org.apache.activeio.util.FactoryFinder;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -40,10 +40,12 @@ import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
|
import javax.sql.DataSource;
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
|
import java.io.IOException;
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
import java.sql.SQLException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link PersistenceAdapter} implementation using JDBC for persistence
|
* A {@link PersistenceAdapter} implementation using JDBC for persistence
|
||||||
|
@ -57,19 +59,23 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.9 $
|
* @version $Revision: 1.9 $
|
||||||
*/
|
*/
|
||||||
public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter {
|
public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, BrokerServiceAware {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
|
private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
|
||||||
private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
|
private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
|
||||||
|
|
||||||
private WireFormat wireFormat = new OpenWireFormat();
|
private WireFormat wireFormat = new OpenWireFormat();
|
||||||
|
private BrokerService brokerService;
|
||||||
private Statements statements;
|
private Statements statements;
|
||||||
private JDBCAdapter adapter;
|
private JDBCAdapter adapter;
|
||||||
private MemoryTransactionStore transactionStore;
|
private MemoryTransactionStore transactionStore;
|
||||||
private ScheduledThreadPoolExecutor clockDaemon;
|
private ScheduledThreadPoolExecutor clockDaemon;
|
||||||
private ScheduledFuture clockTicket;
|
private ScheduledFuture clockTicket;
|
||||||
int cleanupPeriod = 1000 * 60 * 5;
|
private int cleanupPeriod = 1000 * 60 * 5;
|
||||||
private boolean useExternalMessageReferences;
|
private boolean useExternalMessageReferences;
|
||||||
|
private boolean useDatabaseLock = true;
|
||||||
|
private int lockKeepAlivePeriod = 0;
|
||||||
|
private DatabaseLocker databaseLocker;
|
||||||
|
|
||||||
public JDBCPersistenceAdapter() {
|
public JDBCPersistenceAdapter() {
|
||||||
}
|
}
|
||||||
|
@ -156,6 +162,16 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
} finally {
|
} finally {
|
||||||
transactionContext.commit();
|
transactionContext.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isUseDatabaseLock()) {
|
||||||
|
DatabaseLocker service = getDatabaseLocker();
|
||||||
|
if (service == null) {
|
||||||
|
log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
service.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|
||||||
|
@ -175,6 +191,10 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
clockTicket = null;
|
clockTicket = null;
|
||||||
clockDaemon.shutdown();
|
clockDaemon.shutdown();
|
||||||
}
|
}
|
||||||
|
DatabaseLocker service = getDatabaseLocker();
|
||||||
|
if (service != null) {
|
||||||
|
service.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cleanup() {
|
public void cleanup() {
|
||||||
|
@ -227,6 +247,36 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
return adapter;
|
return adapter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public DatabaseLocker getDatabaseLocker() throws IOException {
|
||||||
|
if (databaseLocker == null) {
|
||||||
|
databaseLocker = createDatabaseLocker();
|
||||||
|
if (lockKeepAlivePeriod > 0) {
|
||||||
|
getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
databaseLockKeepAlive();
|
||||||
|
}
|
||||||
|
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return databaseLocker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the database locker strategy to use to lock the database on startup
|
||||||
|
*/
|
||||||
|
public void setDatabaseLocker(DatabaseLocker databaseLocker) {
|
||||||
|
this.databaseLocker = databaseLocker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BrokerService getBrokerService() {
|
||||||
|
return brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
|
this.brokerService = brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -342,6 +392,15 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
this.useExternalMessageReferences = useExternalMessageReferences;
|
this.useExternalMessageReferences = useExternalMessageReferences;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean isUseDatabaseLock() {
|
||||||
|
return useDatabaseLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUseDatabaseLock(boolean useDatabaseLock) {
|
||||||
|
this.useDatabaseLock = useDatabaseLock;
|
||||||
|
}
|
||||||
|
|
||||||
static public void log(String msg, SQLException e) {
|
static public void log(String msg, SQLException e) {
|
||||||
String s = msg+e.getMessage();
|
String s = msg+e.getMessage();
|
||||||
while( e.getNextException() != null ) {
|
while( e.getNextException() != null ) {
|
||||||
|
@ -368,4 +427,37 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
public void setUsageManager(UsageManager usageManager) {
|
public void setUsageManager(UsageManager usageManager) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void databaseLockKeepAlive() {
|
||||||
|
boolean stop = false;
|
||||||
|
try {
|
||||||
|
DatabaseLocker locker = getDatabaseLocker();
|
||||||
|
if (locker != null) {
|
||||||
|
if (!locker.keepAlive()) {
|
||||||
|
stop = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error("Failed to get database when trying keepalive: " + e, e);
|
||||||
|
}
|
||||||
|
if (stop) {
|
||||||
|
stopBroker();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stopBroker() {
|
||||||
|
// we can no longer keep the lock so lets fail
|
||||||
|
log.info("No longer able to keep the exclusive lock so giving up being a master");
|
||||||
|
try {
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.warn("Failed to stop broker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected DatabaseLocker createDatabaseLocker() throws IOException {
|
||||||
|
return new DefaultDatabaseLocker(getDataSource(), getStatements());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ public class Statements {
|
||||||
private String tablePrefix = "";
|
private String tablePrefix = "";
|
||||||
protected String messageTableName = "ACTIVEMQ_MSGS";
|
protected String messageTableName = "ACTIVEMQ_MSGS";
|
||||||
protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
|
protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
|
||||||
|
protected String lockTableName = "ACTIVEMQ_LOCK";
|
||||||
|
|
||||||
protected String binaryDataType = "BLOB";
|
protected String binaryDataType = "BLOB";
|
||||||
protected String containerNameDataType = "VARCHAR(250)";
|
protected String containerNameDataType = "VARCHAR(250)";
|
||||||
|
@ -57,6 +58,9 @@ public class Statements {
|
||||||
private String deleteOldMessagesStatement;
|
private String deleteOldMessagesStatement;
|
||||||
private String[] createSchemaStatements;
|
private String[] createSchemaStatements;
|
||||||
private String[] dropSchemaStatements;
|
private String[] dropSchemaStatements;
|
||||||
|
private String lockCreateStatement;
|
||||||
|
private String lockUpdateStatement;
|
||||||
|
private boolean useLockCreateWhereClause;
|
||||||
|
|
||||||
public String[] getCreateSchemaStatements() {
|
public String[] getCreateSchemaStatements() {
|
||||||
if (createSchemaStatements == null) {
|
if (createSchemaStatements == null) {
|
||||||
|
@ -75,7 +79,11 @@ public class Statements {
|
||||||
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
|
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
|
||||||
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
|
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
|
||||||
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
|
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
|
||||||
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", };
|
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
|
||||||
|
"CREATE TABLE " + getFullLockTableName() + "( ID " + longDataType + ", TIME " + longDataType
|
||||||
|
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
|
||||||
|
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
|
||||||
|
};
|
||||||
}
|
}
|
||||||
return createSchemaStatements;
|
return createSchemaStatements;
|
||||||
}
|
}
|
||||||
|
@ -220,12 +228,30 @@ public class Statements {
|
||||||
public String getDeleteOldMessagesStatement() {
|
public String getDeleteOldMessagesStatement() {
|
||||||
if (deleteOldMessagesStatement == null) {
|
if (deleteOldMessagesStatement == null) {
|
||||||
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
|
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
|
||||||
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT min(" + getFullAckTableName()
|
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT min(" + getFullAckTableName()
|
||||||
+ ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName()
|
+ ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName()
|
||||||
+ ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
|
+ ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
|
||||||
}
|
}
|
||||||
return deleteOldMessagesStatement;
|
return deleteOldMessagesStatement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getLockCreateStatement() {
|
||||||
|
if (lockCreateStatement == null) {
|
||||||
|
lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
|
||||||
|
if (useLockCreateWhereClause) {
|
||||||
|
lockCreateStatement += " WHERE ID = 1";
|
||||||
|
}
|
||||||
|
lockCreateStatement += " FOR UPDATE";
|
||||||
|
}
|
||||||
|
return lockCreateStatement;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLockUpdateStatement() {
|
||||||
|
if (lockUpdateStatement == null) {
|
||||||
|
lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET time = ? WHERE ID = 1";
|
||||||
|
}
|
||||||
|
return lockUpdateStatement;
|
||||||
|
}
|
||||||
|
|
||||||
public String getFullMessageTableName() {
|
public String getFullMessageTableName() {
|
||||||
return getTablePrefix() + getMessageTableName();
|
return getTablePrefix() + getMessageTableName();
|
||||||
|
@ -234,7 +260,12 @@ public class Statements {
|
||||||
public String getFullAckTableName() {
|
public String getFullAckTableName() {
|
||||||
return getTablePrefix() + getDurableSubAcksTableName();
|
return getTablePrefix() + getDurableSubAcksTableName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getFullLockTableName() {
|
||||||
|
return getTablePrefix() + getLockTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Returns the containerNameDataType.
|
* @return Returns the containerNameDataType.
|
||||||
*/
|
*/
|
||||||
|
@ -339,6 +370,14 @@ public class Statements {
|
||||||
public void setDurableSubAcksTableName(String durableSubAcksTableName) {
|
public void setDurableSubAcksTableName(String durableSubAcksTableName) {
|
||||||
this.durableSubAcksTableName = durableSubAcksTableName;
|
this.durableSubAcksTableName = durableSubAcksTableName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getLockTableName() {
|
||||||
|
return lockTableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLockTableName(String lockTableName) {
|
||||||
|
this.lockTableName = lockTableName;
|
||||||
|
}
|
||||||
|
|
||||||
public String getLongDataType() {
|
public String getLongDataType() {
|
||||||
return longDataType;
|
return longDataType;
|
||||||
|
@ -444,4 +483,19 @@ public class Statements {
|
||||||
this.updateMessageStatement = updateMessageStatment;
|
this.updateMessageStatement = updateMessageStatment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUseLockCreateWhereClause() {
|
||||||
|
return useLockCreateWhereClause;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUseLockCreateWhereClause(boolean useLockCreateWhereClause) {
|
||||||
|
this.useLockCreateWhereClause = useLockCreateWhereClause;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLockCreateStatement(String lockCreateStatement) {
|
||||||
|
this.lockCreateStatement = lockCreateStatement;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLockUpdateStatement(String lockUpdateStatement) {
|
||||||
|
this.lockUpdateStatement = lockUpdateStatement;
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue