- Refactored the StementProvider and related classes to a simpler Statements class that's easier to configure though the xbean configuration system.

- Added test cases that show this in use.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378119 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-02-15 23:37:40 +00:00
parent a33f399004
commit 4843dd459c
19 changed files with 725 additions and 827 deletions

View File

@ -19,18 +19,19 @@ package org.apache.activemq.store;
import java.io.File;
import java.io.IOException;
import javax.sql.DataSource;
import org.activeio.journal.Journal;
import org.activeio.journal.active.JournalImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.journal.QuickJournalPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
import javax.sql.DataSource;
/**
* Factory class that can create PersistenceAdapter objects.
*
@ -48,20 +49,20 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
private boolean useJournal=true;
private boolean useQuickJournal=false;
private File journalArchiveDirectory;
private JDBCPersistenceAdapter jdbcAdapter = new JDBCPersistenceAdapter();
private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
public PersistenceAdapter createPersistenceAdapter() throws IOException {
File dataDirectory = getDataDirectory();
jdbcAdapter.setDataSource(getDataSource());
jdbcPersistenceAdapter.setDataSource(getDataSource());
if( !useJournal )
return jdbcAdapter;
return jdbcPersistenceAdapter;
// Setup the Journal
if( useQuickJournal ) {
return new QuickJournalPersistenceAdapter(getJournal(), jdbcAdapter, getMemManager(), getTaskRunnerFactory());
return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory());
} else {
return new JournalPersistenceAdapter(getJournal(), jdbcAdapter, getMemManager(), getTaskRunnerFactory());
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory());
}
}
@ -115,11 +116,11 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
}
public JDBCPersistenceAdapter getJdbcAdapter() {
return jdbcAdapter;
return jdbcPersistenceAdapter;
}
public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
this.jdbcAdapter = jdbcAdapter;
this.jdbcPersistenceAdapter = jdbcAdapter;
}
public boolean isUseJournal() {
@ -173,19 +174,18 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
}
public JDBCAdapter getAdapter() throws IOException {
return jdbcAdapter.getAdapter();
}
public String getAdapterClass() {
return jdbcAdapter.getAdapterClass();
return jdbcPersistenceAdapter.getAdapter();
}
public void setAdapter(JDBCAdapter adapter) {
jdbcAdapter.setAdapter(adapter);
jdbcPersistenceAdapter.setAdapter(adapter);
}
public void setAdapterClass(String adapterClass) {
jdbcAdapter.setAdapterClass(adapterClass);
public Statements getStatements() {
return jdbcPersistenceAdapter.getStatements();
}
public void setStatements(Statements statements) {
jdbcPersistenceAdapter.setStatements(statements);
}
// Implementation methods

View File

@ -28,7 +28,9 @@ import org.apache.activemq.command.SubscriptionInfo;
* @version $Revision: 1.5 $
*/
public interface JDBCAdapter {
public void setStatements(Statements statementProvider);
public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;

View File

@ -63,8 +63,8 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
private WireFormat wireFormat = new OpenWireFormat(false);
private DataSource dataSource;
private Statements statements;
private JDBCAdapter adapter;
private String adapterClass;
private MemoryTransactionStore transactionStore;
private ScheduledThreadPoolExecutor clockDaemon;
private ScheduledFuture clockTicket;
@ -209,7 +209,7 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
public JDBCAdapter getAdapter() throws IOException {
if (adapter == null) {
adapter = createAdapter();
setAdapter(createAdapter());
}
return adapter;
}
@ -222,39 +222,24 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
TransactionContext c = getTransactionContext();
try {
// If the adapter class is not specified.. try to detect the right
// type by getting info from the database.
if (adapterClass == null) {
try {
// Make the filename file system safe.
String dirverName = c.getConnection().getMetaData().getDriverName();
dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
try {
// Make the filename file system safe.
String dirverName = c.getConnection().getMetaData().getDriverName();
dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
try {
adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
log.info("Database driver recognized: [" + dirverName + "]");
} catch (Throwable e) {
log.warn("Database driver NOT recognized: [" + dirverName
+ "]. Will use default JDBC implementation.");
}
} catch (SQLException e) {
log.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
+ e.getMessage());
JDBCPersistenceAdapter.log("Failure Details: ",e);
}
} else {
try {
Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass);
adapter = (DefaultJDBCAdapter) clazz.newInstance();
adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
log.info("Database driver recognized: [" + dirverName + "]");
} catch (Throwable e) {
log.warn("Invalid JDBC adapter class class (" + adapterClass
+ "). Will use default JDBC implementation.");
log.debug("Reason: " + e, e);
log.warn("Database driver NOT recognized: [" + dirverName
+ "]. Will use default JDBC implementation.");
}
} catch (SQLException e) {
log.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
+ e.getMessage());
JDBCPersistenceAdapter.log("Failure Details: ",e);
}
// Use the default JDBC adapter if the
@ -271,6 +256,7 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
public void setAdapter(JDBCAdapter adapter) {
this.adapter = adapter;
this.adapter.setStatements(getStatements());
}
public DataSource getDataSource() {
@ -321,21 +307,6 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
transactionContext.rollback();
}
/**
* @return Returns the adapterClass.
*/
public String getAdapterClass() {
return adapterClass;
}
/**
* @param adapterClass
* The adapterClass to set.
*/
public void setAdapterClass(String adapterClass) {
this.adapterClass = adapterClass;
}
public int getCleanupPeriod() {
return cleanupPeriod;
}
@ -375,4 +346,15 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
log.debug(s, e);
}
public Statements getStatements() {
if( statements == null ) {
statements = new Statements();
}
return statements;
}
public void setStatements(Statements statements) {
this.statements = statements;
}
}

View File

@ -1,52 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
/**
* Generates the SQL statements that are used by the JDBCAdapter.
*
* @version $Revision: 1.4 $
*/
public interface StatementProvider {
public String[] getCreateSchemaStatments();
public String[] getDropSchemaStatments();
public String getAddMessageStatment();
public String getUpdateMessageStatment();
public String getRemoveMessageStatment();
public String getFindMessageSequenceIdStatment();
public String getFindMessageStatment();
public String getFindAllMessagesStatment();
public String getFindLastSequenceIdInMsgs();
public String getFindLastSequenceIdInAcks();
public String getCreateDurableSubStatment();
public String getFindDurableSubStatment();
public String getUpdateLastAckOfDurableSub();
public String getFindAllDurableSubMessagesStatment();
public String getRemoveAllMessagesStatment();
public String getRemoveAllSubscriptionsStatment();
public String getDeleteSubscriptionStatment();
public String getDeleteOldMessagesStatment();
public String getFindAllDestinationsStatment();
public void setUseExternalMessageReferences(boolean useExternalMessageReferences);
public boolean isUseExternalMessageReferences();
public String getFullMessageTableName();
public String getFindAllDurableSubsStatment();
}

View File

@ -0,0 +1,463 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
/**
* @version $Revision: 1.4 $
*
* @org.apache.xbean.XBean element="statements"
*
*/
public class Statements {
private String tablePrefix = "";
protected String messageTableName = "ACTIVEMQ_MSGS";
protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
protected String binaryDataType = "BLOB";
protected String containerNameDataType = "VARCHAR(250)";
protected String xidDataType = "VARCHAR(250)";
protected String msgIdDataType = "VARCHAR(250)";
protected String sequenceDataType = "INTEGER";
protected String longDataType = "BIGINT";
protected String stringIdDataType = "VARCHAR(250)";
protected boolean useExternalMessageReferences = false;
private String addMessageStatement;
private String updateMessageStatement;
private String removeMessageStatment;
private String findMessageSequenceIdStatement;
private String findMessageStatement;
private String findAllMessagesStatement;
private String findLastSequenceIdInMsgsStatement;
private String findLastSequenceIdInAcksStatement;
private String createDurableSubStatement;
private String findDurableSubStatement;
private String findAllDurableSubsStatement;
private String updateLastAckOfDurableSubStatement;
private String deleteSubscriptionStatement;
private String findAllDurableSubMessagesStatement;
private String findAllDestinationsStatement;
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
private String deleteOldMessagesStatement;
private String[] createSchemaStatements;
private String[] dropSchemaStatements;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
createSchemaStatements = new String[] {
"CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL"
+ ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ "
+ sequenceDataType + ", EXPIRATION " + longDataType + ", MSG "
+ (useExternalMessageReferences ? stringIdDataType : binaryDataType)
+ ", PRIMARY KEY ( ID ) )",
"CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName()
+ " (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName()
+ " (CONTAINER)",
"CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName()
+ " (EXPIRATION)",
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", };
}
return createSchemaStatements;
}
public String[] getDropSchemaStatements() {
if (dropSchemaStatements == null) {
dropSchemaStatements = new String[] { "DROP TABLE " + getFullAckTableName() + "",
"DROP TABLE " + getFullMessageTableName() + "", };
}
return dropSchemaStatements;
}
public String getAddMessageStatement() {
if (addMessageStatement == null) {
addMessageStatement = "INSERT INTO " + getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
}
return addMessageStatement;
}
public String getUpdateMessageStatement() {
if (updateMessageStatement == null) {
updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?";
}
return updateMessageStatement;
}
public String getRemoveMessageStatment() {
if (removeMessageStatment == null) {
removeMessageStatment = "DELETE FROM " + getFullMessageTableName() + " WHERE ID=?";
}
return removeMessageStatment;
}
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
+ " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
}
return findMessageSequenceIdStatement;
}
public String getFindMessageStatement() {
if (findMessageStatement == null) {
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?";
}
return findMessageStatement;
}
public String getFindAllMessagesStatement() {
if (findAllMessagesStatement == null) {
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? ORDER BY ID";
}
return findAllMessagesStatement;
}
public String getFindLastSequenceIdInMsgsStatement() {
if (findLastSequenceIdInMsgsStatement == null) {
findLastSequenceIdInMsgsStatement = "SELECT MAX(ID) FROM " + getFullMessageTableName();
}
return findLastSequenceIdInMsgsStatement;
}
public String getFindLastSequenceIdInAcksStatement() {
if (findLastSequenceIdInAcksStatement == null) {
findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName();
}
return findLastSequenceIdInAcksStatement;
}
public String getCreateDurableSubStatement() {
if (createDurableSubStatement == null) {
createDurableSubStatement = "INSERT INTO " + getFullAckTableName()
+ "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)";
}
return createDurableSubStatement;
}
public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) {
findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return findDurableSubStatement;
}
public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) {
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName()
+ " WHERE CONTAINER=?";
}
return findAllDurableSubsStatement;
}
public String getUpdateLastAckOfDurableSubStatement() {
if (updateLastAckOfDurableSubStatement == null) {
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return updateLastAckOfDurableSubStatement;
}
public String getDeleteSubscriptionStatement() {
if (deleteSubscriptionStatement == null) {
deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return deleteSubscriptionStatement;
}
public String getFindAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {
findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
}
return findAllDurableSubMessagesStatement;
}
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
}
return findAllDestinationsStatement;
}
public String getRemoveAllMessagesStatement() {
if (removeAllMessagesStatement == null) {
removeAllMessagesStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE CONTAINER=?";
}
return removeAllMessagesStatement;
}
public String getRemoveAllSubscriptionsStatement() {
if (removeAllSubscriptionsStatement == null) {
removeAllSubscriptionsStatement = "DELETE FROM " + getFullAckTableName() + " WHERE CONTAINER=?";
}
return removeAllSubscriptionsStatement;
}
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT min(" + getFullAckTableName()
+ ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName()
+ ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
}
return deleteOldMessagesStatement;
}
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
}
public String getFullAckTableName() {
return getTablePrefix() + getDurableSubAcksTableName();
}
/**
* @return Returns the containerNameDataType.
*/
public String getContainerNameDataType() {
return containerNameDataType;
}
/**
* @param containerNameDataType
* The containerNameDataType to set.
*/
public void setContainerNameDataType(String containerNameDataType) {
this.containerNameDataType = containerNameDataType;
}
/**
* @return Returns the messageDataType.
*/
public String getBinaryDataType() {
return binaryDataType;
}
/**
* @param messageDataType
* The messageDataType to set.
*/
public void setBinaryDataType(String messageDataType) {
this.binaryDataType = messageDataType;
}
/**
* @return Returns the messageTableName.
*/
public String getMessageTableName() {
return messageTableName;
}
/**
* @param messageTableName
* The messageTableName to set.
*/
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
/**
* @return Returns the msgIdDataType.
*/
public String getMsgIdDataType() {
return msgIdDataType;
}
/**
* @param msgIdDataType
* The msgIdDataType to set.
*/
public void setMsgIdDataType(String msgIdDataType) {
this.msgIdDataType = msgIdDataType;
}
/**
* @return Returns the sequenceDataType.
*/
public String getSequenceDataType() {
return sequenceDataType;
}
/**
* @param sequenceDataType
* The sequenceDataType to set.
*/
public void setSequenceDataType(String sequenceDataType) {
this.sequenceDataType = sequenceDataType;
}
/**
* @return Returns the tablePrefix.
*/
public String getTablePrefix() {
return tablePrefix;
}
/**
* @param tablePrefix
* The tablePrefix to set.
*/
public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}
/**
* @return Returns the xidDataType.
*/
public String getXidDataType() {
return xidDataType;
}
/**
* @param xidDataType
* The xidDataType to set.
*/
public void setXidDataType(String xidDataType) {
this.xidDataType = xidDataType;
}
/**
* @return Returns the durableSubAcksTableName.
*/
public String getDurableSubAcksTableName() {
return durableSubAcksTableName;
}
/**
* @param durableSubAcksTableName
* The durableSubAcksTableName to set.
*/
public void setDurableSubAcksTableName(String durableSubAcksTableName) {
this.durableSubAcksTableName = durableSubAcksTableName;
}
public String getLongDataType() {
return longDataType;
}
public void setLongDataType(String longDataType) {
this.longDataType = longDataType;
}
public String getStringIdDataType() {
return stringIdDataType;
}
public void setStringIdDataType(String stringIdDataType) {
this.stringIdDataType = stringIdDataType;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences = useExternalMessageReferences;
}
public boolean isUseExternalMessageReferences() {
return useExternalMessageReferences;
}
public void setAddMessageStatement(String addMessageStatment) {
this.addMessageStatement = addMessageStatment;
}
public void setCreateDurableSubStatement(String createDurableSubStatment) {
this.createDurableSubStatement = createDurableSubStatment;
}
public void setCreateSchemaStatements(String[] createSchemaStatments) {
this.createSchemaStatements = createSchemaStatments;
}
public void setDeleteOldMessagesStatement(String deleteOldMessagesStatment) {
this.deleteOldMessagesStatement = deleteOldMessagesStatment;
}
public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) {
this.deleteSubscriptionStatement = deleteSubscriptionStatment;
}
public void setDropSchemaStatements(String[] dropSchemaStatments) {
this.dropSchemaStatements = dropSchemaStatments;
}
public void setFindAllDestinationsStatement(String findAllDestinationsStatment) {
this.findAllDestinationsStatement = findAllDestinationsStatment;
}
public void setFindAllDurableSubMessagesStatement(String findAllDurableSubMessagesStatment) {
this.findAllDurableSubMessagesStatement = findAllDurableSubMessagesStatment;
}
public void setFindAllDurableSubsStatement(String findAllDurableSubsStatment) {
this.findAllDurableSubsStatement = findAllDurableSubsStatment;
}
public void setFindAllMessagesStatement(String findAllMessagesStatment) {
this.findAllMessagesStatement = findAllMessagesStatment;
}
public void setFindDurableSubStatement(String findDurableSubStatment) {
this.findDurableSubStatement = findDurableSubStatment;
}
public void setFindLastSequenceIdInAcksStatement(String findLastSequenceIdInAcks) {
this.findLastSequenceIdInAcksStatement = findLastSequenceIdInAcks;
}
public void setFindLastSequenceIdInMsgsStatement(String findLastSequenceIdInMsgs) {
this.findLastSequenceIdInMsgsStatement = findLastSequenceIdInMsgs;
}
public void setFindMessageSequenceIdStatement(String findMessageSequenceIdStatment) {
this.findMessageSequenceIdStatement = findMessageSequenceIdStatment;
}
public void setFindMessageStatement(String findMessageStatment) {
this.findMessageStatement = findMessageStatment;
}
public void setRemoveAllMessagesStatement(String removeAllMessagesStatment) {
this.removeAllMessagesStatement = removeAllMessagesStatment;
}
public void setRemoveAllSubscriptionsStatement(String removeAllSubscriptionsStatment) {
this.removeAllSubscriptionsStatement = removeAllSubscriptionsStatment;
}
public void setRemoveMessageStatment(String removeMessageStatment) {
this.removeMessageStatment = removeMessageStatment;
}
public void setUpdateLastAckOfDurableSubStatement(String updateLastAckOfDurableSub) {
this.updateLastAckOfDurableSubStatement = updateLastAckOfDurableSub;
}
public void setUpdateMessageStatement(String updateMessageStatment) {
this.updateMessageStatement = updateMessageStatment;
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
* Axion specific Adapter.
@ -25,51 +25,39 @@ import org.apache.activemq.store.jdbc.StatementProvider;
* - We cannot auto upgrade the schema was we roll out new versions of ActiveMQ
* - We cannot delete durable sub messages that have be acknowledged by all consumers.
*
* @org.apache.xbean.XBean element="axionJDBCAdapter"
* @version $Revision: 1.4 $
*/
public class AxionJDBCAdapter extends StreamJDBCAdapter {
public static StatementProvider createStatementProvider() {
DefaultStatementProvider answer = new DefaultStatementProvider() {
public String [] getCreateSchemaStatments() {
return new String[]{
"CREATE TABLE "+getTablePrefix()+messageTableName+"("
+"ID "+sequenceDataType+" NOT NULL"
+", CONTAINER "+containerNameDataType
+", MSGID_PROD "+msgIdDataType
+", MSGID_SEQ "+sequenceDataType
+", EXPIRATION "+longDataType
+", MSG "+(useExternalMessageReferences ? stringIdDataType : binaryDataType)
+", PRIMARY KEY ( ID ) )",
"CREATE INDEX "+getTablePrefix()+messageTableName+"_MIDX ON "+getTablePrefix()+messageTableName+" (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX "+getTablePrefix()+messageTableName+"_CIDX ON "+getTablePrefix()+messageTableName+" (CONTAINER)",
"CREATE INDEX "+getFullMessageTableName()+"_EIDX ON "+getFullMessageTableName()+" (EXPIRATION)",
"CREATE TABLE "+getTablePrefix()+durableSubAcksTableName+"("
+"CONTAINER "+containerNameDataType+" NOT NULL"
+", CLIENT_ID "+stringIdDataType+" NOT NULL"
+", SUB_NAME "+stringIdDataType+" NOT NULL"
+", SELECTOR "+stringIdDataType
+", LAST_ACKED_ID "+sequenceDataType
+", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
};
}
public String getDeleteOldMessagesStatment() {
return "DELETE FROM "+getTablePrefix()+messageTableName+
" WHERE ( EXPIRATION<>0 AND EXPIRATION<?)";
}
};
answer.setLongDataType("LONG");
return answer;
public void setStatements(Statements statements) {
statements.setCreateSchemaStatements(
new String[]{
"CREATE TABLE "+statements.getFullMessageTableName()+"("
+"ID "+statements.getSequenceDataType()+" NOT NULL"
+", CONTAINER "+statements.getContainerNameDataType()
+", MSGID_PROD "+statements.getMsgIdDataType()
+", MSGID_SEQ "+statements.getSequenceDataType()
+", EXPIRATION "+statements.getLongDataType()
+", MSG "+(statements.isUseExternalMessageReferences() ? statements.getStringIdDataType() : statements.getBinaryDataType())
+", PRIMARY KEY ( ID ) )",
"CREATE INDEX "+statements.getFullMessageTableName()+"_MIDX ON "+statements.getFullMessageTableName()+" (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX "+statements.getFullMessageTableName()+"_CIDX ON "+statements.getFullMessageTableName()+" (CONTAINER)",
"CREATE INDEX "+statements.getFullMessageTableName()+"_EIDX ON "+statements.getFullMessageTableName()+" (EXPIRATION)",
"CREATE TABLE "+statements.getFullAckTableName()+"("
+"CONTAINER "+statements.getContainerNameDataType()+" NOT NULL"
+", CLIENT_ID "+statements.getStringIdDataType()+" NOT NULL"
+", SUB_NAME "+statements.getStringIdDataType()+" NOT NULL"
+", SELECTOR "+statements.getStringIdDataType()
+", LAST_ACKED_ID "+statements.getSequenceDataType()
+", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
}
);
statements.setDeleteOldMessagesStatement("DELETE FROM "+statements.getFullMessageTableName()+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)");
statements.setLongDataType("LONG");
super.setStatements(statements);
}
public AxionJDBCAdapter() {
this(createStatementProvider());
}
public AxionJDBCAdapter(StatementProvider provider) {
super(provider);
}
}

View File

@ -28,7 +28,6 @@ import java.sql.SQLException;
import javax.jms.JMSException;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.TransactionContext;
@ -46,17 +45,11 @@ import org.apache.activemq.store.jdbc.TransactionContext;
* <li></li>
* </ul>
*
* @org.apache.xbean.XBean element="blobJDBCAdapter"
*
* @version $Revision: 1.2 $
*/
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
public BlobJDBCAdapter() {
super();
}
public BlobJDBCAdapter(StatementProvider provider) {
super(provider);
}
public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException,
JMSException {
@ -65,7 +58,7 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
try {
// Add the Blob record.
s = c.prepareStatement(statementProvider.getAddMessageStatment());
s = c.prepareStatement(statements.getAddMessageStatement());
s.setLong(1, seq);
s.setString(2, destinationName);
s.setString(3, messageID);
@ -77,7 +70,7 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.close();
// Select the blob record so that we can update it.
s = c.prepareStatement(statementProvider.getFindMessageStatment());
s = c.prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();
if (!rs.next())
@ -92,7 +85,7 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.close();
// Update the row with the updated blob
s = c.prepareStatement(statementProvider.getUpdateMessageStatment());
s = c.prepareStatement(statements.getUpdateMessageStatement());
s.setBlob(1, blob);
s.setLong(2, seq);
@ -115,7 +108,7 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
PreparedStatement s=null; ResultSet rs=null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindMessageStatment());
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();

View File

@ -20,30 +20,19 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.store.jdbc.StatementProvider;
/**
* This JDBCAdapter inserts and extracts BLOB data using the
* setBytes()/getBytes() operations.
*
* The databases/JDBC drivers that use this adapter are:
* <ul>
* <li></li>
* </ul>
*
* @org.apache.xbean.XBean element="bytesJDBCAdapter"
*
* @version $Revision: 1.2 $
*/
public class BytesJDBCAdapter extends DefaultJDBCAdapter {
public BytesJDBCAdapter() {
super();
}
public BytesJDBCAdapter(StatementProvider provider) {
super(provider);
}
/**
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, int)
*/

View File

@ -1,233 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
/**
* A StatementProvider filter that caches the responses
* of the filtered StatementProvider.
*
* @version $Revision: 1.4 $
*/
public class CachingStatementProvider implements StatementProvider {
final private StatementProvider statementProvider;
private String addMessageStatment;
private String[] createSchemaStatments;
private String[] dropSchemaStatments;
private String findAllMessagesStatment;
private String findLastSequenceIdInMsgs;
private String findMessageStatment;
private String removeMessageStatment;
private String updateMessageStatment;
private String createDurableSubStatment;
private String findDurableSubStatment;
private String findAllDurableSubMessagesStatment;
private String updateLastAckOfDurableSub;
private String findMessageSequenceIdStatment;
private String removeAllMessagesStatment;
private String removeAllSubscriptionsStatment;
private String deleteSubscriptionStatment;
private String deleteOldMessagesStatment;
private String findLastSequenceIdInAcks;
private String findAllDestinationsStatment;
private String findAllDurableSubsStatment;
public CachingStatementProvider(StatementProvider statementProvider) {
this.statementProvider = statementProvider;
}
public StatementProvider getNext() {
return statementProvider;
}
public String getAddMessageStatment() {
if (addMessageStatment == null) {
addMessageStatment = statementProvider.getAddMessageStatment();
}
return addMessageStatment;
}
public String[] getCreateSchemaStatments() {
if( createSchemaStatments==null ) {
createSchemaStatments = statementProvider.getCreateSchemaStatments();
}
return createSchemaStatments;
}
public String[] getDropSchemaStatments() {
if( dropSchemaStatments==null ) {
dropSchemaStatments = statementProvider.getDropSchemaStatments();
}
return dropSchemaStatments;
}
public String getFindAllMessagesStatment() {
if( findAllMessagesStatment==null ) {
findAllMessagesStatment = statementProvider.getFindAllMessagesStatment();
}
return findAllMessagesStatment;
}
public String getFindLastSequenceIdInMsgs() {
if( findLastSequenceIdInMsgs==null ) {
findLastSequenceIdInMsgs = statementProvider.getFindLastSequenceIdInMsgs();
}
return findLastSequenceIdInMsgs;
}
public String getFindLastSequenceIdInAcks() {
if( findLastSequenceIdInAcks==null ) {
findLastSequenceIdInAcks = statementProvider.getFindLastSequenceIdInAcks();
}
return findLastSequenceIdInAcks;
}
public String getFindMessageStatment() {
if( findMessageStatment==null ) {
findMessageStatment = statementProvider.getFindMessageStatment();
}
return findMessageStatment;
}
/**
* @return
*/
public String getRemoveMessageStatment() {
if( removeMessageStatment==null ) {
removeMessageStatment = statementProvider.getRemoveMessageStatment();
}
return removeMessageStatment;
}
public String getUpdateMessageStatment() {
if( updateMessageStatment==null ) {
updateMessageStatment = statementProvider.getUpdateMessageStatment();
}
return updateMessageStatment;
}
public String getCreateDurableSubStatment() {
if(createDurableSubStatment==null) {
createDurableSubStatment = statementProvider.getCreateDurableSubStatment();
}
return createDurableSubStatment;
}
public String getFindDurableSubStatment() {
if(findDurableSubStatment==null) {
findDurableSubStatment = statementProvider.getFindDurableSubStatment();
}
return findDurableSubStatment;
}
public String getFindAllDurableSubMessagesStatment() {
if(findAllDurableSubMessagesStatment==null) {
findAllDurableSubMessagesStatment = statementProvider.getFindAllDurableSubMessagesStatment();
}
return findAllDurableSubMessagesStatment;
}
public String getUpdateLastAckOfDurableSub() {
if(updateLastAckOfDurableSub==null) {
updateLastAckOfDurableSub = statementProvider.getUpdateLastAckOfDurableSub();
}
return updateLastAckOfDurableSub;
}
public String getFindMessageSequenceIdStatment() {
if ( findMessageSequenceIdStatment==null ) {
findMessageSequenceIdStatment = statementProvider.getFindMessageSequenceIdStatment();
}
return findMessageSequenceIdStatment;
}
public String getRemoveAllMessagesStatment() {
if ( removeAllMessagesStatment==null ) {
removeAllMessagesStatment = statementProvider.getRemoveAllMessagesStatment();
}
return removeAllMessagesStatment;
}
public String getRemoveAllSubscriptionsStatment() {
if ( removeAllSubscriptionsStatment==null ) {
removeAllSubscriptionsStatment = statementProvider.getRemoveAllSubscriptionsStatment();
}
return removeAllSubscriptionsStatment;
}
public String getDeleteSubscriptionStatment() {
if ( deleteSubscriptionStatment==null ) {
deleteSubscriptionStatment = statementProvider.getDeleteSubscriptionStatment();
}
return deleteSubscriptionStatment;
}
public String getDeleteOldMessagesStatment() {
if ( deleteOldMessagesStatment==null ) {
deleteOldMessagesStatment = statementProvider.getDeleteOldMessagesStatment();
}
return deleteOldMessagesStatment;
}
public String getFindAllDestinationsStatment() {
if ( findAllDestinationsStatment==null ) {
findAllDestinationsStatment = statementProvider.getFindAllDestinationsStatment();
}
return findAllDestinationsStatment;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
addMessageStatment=null;
createSchemaStatments=null;
dropSchemaStatments=null;
findAllMessagesStatment=null;
findLastSequenceIdInMsgs=null;
findMessageStatment=null;
removeMessageStatment=null;
updateMessageStatment=null;
createDurableSubStatment=null;
findDurableSubStatment=null;
findAllDurableSubMessagesStatment=null;
updateLastAckOfDurableSub=null;
findMessageSequenceIdStatment=null;
removeAllMessagesStatment=null;
removeAllSubscriptionsStatment=null;
deleteSubscriptionStatment=null;
deleteOldMessagesStatment=null;
findLastSequenceIdInAcks=null;
findAllDestinationsStatment=null;
statementProvider.setUseExternalMessageReferences(useExternalMessageReferences);
}
public boolean isUseExternalMessageReferences() {
return statementProvider.isUseExternalMessageReferences();
}
public String getFullMessageTableName() {
return statementProvider.getFullMessageTableName();
}
public String getFindAllDurableSubsStatment() {
if ( findAllDurableSubsStatment==null ) {
findAllDurableSubsStatment = statementProvider.getFindAllDurableSubsStatment();
}
return findAllDurableSubsStatment;
}
}

View File

@ -31,7 +31,7 @@ import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -52,13 +52,15 @@ import org.apache.commons.logging.LogFactory;
* <li></li>
* </ul>
*
* @org.apache.xbean.XBean element="defaultJDBCAdapter"
*
* @version $Revision: 1.10 $
*/
public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
final protected StatementProvider statementProvider;
protected Statements statements;
protected boolean batchStatments=true;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
@ -69,17 +71,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
return rs.getBytes(index);
}
/**
* @param provider
*/
public DefaultJDBCAdapter(StatementProvider provider) {
this.statementProvider = new CachingStatementProvider(provider);
}
public DefaultJDBCAdapter() {
this(new DefaultStatementProvider());
}
public void doCreateTables(TransactionContext c) throws SQLException, IOException {
Statement s = null;
try {
@ -89,7 +80,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
boolean alreadyExists = false;
ResultSet rs=null;
try {
rs= c.getConnection().getMetaData().getTables(null,null, statementProvider.getFullMessageTableName(), new String[] {"TABLE"});
rs= c.getConnection().getMetaData().getTables(null,null, statements.getFullMessageTableName(), new String[] {"TABLE"});
alreadyExists = rs.next();
} catch (Throwable ignore) {
} finally {
@ -97,7 +88,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
s = c.getConnection().createStatement();
String[] createStatments = statementProvider.getCreateSchemaStatments();
String[] createStatments = statements.getCreateSchemaStatements();
for (int i = 0; i < createStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
@ -133,7 +124,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
Statement s = null;
try {
s = c.getConnection().createStatement();
String[] dropStatments = statementProvider.getDropSchemaStatments();
String[] dropStatments = statements.getDropSchemaStatements();
for (int i = 0; i < dropStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
@ -161,7 +152,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
rs = s.executeQuery();
long seq1 = 0;
if (rs.next()) {
@ -169,7 +160,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
rs.close();
s.close();
s = c.getConnection().prepareStatement(statementProvider.getFindLastSequenceIdInAcks());
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
rs = s.executeQuery();
long seq2 = 0;
if (rs.next()) {
@ -188,7 +179,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = c.getAddMessageStatement();
try {
if( s == null ) {
s = c.getConnection().prepareStatement(statementProvider.getAddMessageStatment());
s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
if( batchStatments ) {
c.setAddMessageStatement(s);
}
@ -215,7 +206,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = c.getAddMessageStatement();
try {
if( s == null ) {
s = c.getConnection().prepareStatement(statementProvider.getAddMessageStatment());
s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
if( batchStatments ) {
c.setAddMessageStatement(s);
}
@ -243,7 +234,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
s.setLong(2, messageID.getProducerSequenceId());
rs = s.executeQuery();
@ -265,7 +256,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindMessageStatment());
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();
@ -286,7 +277,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindMessageStatment());
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();
@ -307,7 +298,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = c.getAddMessageStatement();
try {
if( s == null ) {
s = c.getConnection().prepareStatement(statementProvider.getRemoveMessageStatment());
s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
if( batchStatments ) {
c.setRemovedMessageStatement(s);
}
@ -333,11 +324,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindAllMessagesStatment());
s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
if( statementProvider.isUseExternalMessageReferences() ) {
if( statements.isUseExternalMessageReferences() ) {
while (rs.next()) {
listener.recoverMessageReference(rs.getString(2));
}
@ -360,7 +351,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = c.getAddMessageStatement();
try {
if( s == null ) {
s = c.getConnection().prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
if( batchStatments ) {
c.setUpdateLastAckStatement(s);
}
@ -391,13 +382,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
rs = s.executeQuery();
if( statementProvider.isUseExternalMessageReferences() ) {
if( statements.isUseExternalMessageReferences() ) {
while (rs.next()) {
listener.recoverMessageReference(rs.getString(2));
}
@ -427,7 +418,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
long lastMessageId = -1;
if(!retroactive) {
s = c.getConnection().prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
ResultSet rs=null;
try {
rs = s.executeQuery();
@ -440,7 +431,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
s = c.getConnection().prepareStatement(statementProvider.getCreateDurableSubStatment());
s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -462,7 +453,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindDurableSubStatment());
s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -491,7 +482,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubsStatment());
s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
@ -516,12 +507,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException {
PreparedStatement s = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getRemoveAllMessagesStatment());
s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
s.close();
s = c.getConnection().prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment());
s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
@ -534,7 +525,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getDeleteSubscriptionStatment());
s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -548,7 +539,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getDeleteOldMessagesStatment());
s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
s.setLong(1, System.currentTimeMillis());
int i = s.executeUpdate();
log.debug("Deleted "+i+" old message(s).");
@ -579,7 +570,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindAllDestinationsStatment());
s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
rs = s.executeQuery();
while (rs.next()) {
@ -602,7 +593,15 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
statementProvider.setUseExternalMessageReferences(useExternalMessageReferences);
statements.setUseExternalMessageReferences(useExternalMessageReferences);
}
public Statements getStatements() {
return statements;
}
public void setStatements(Statements statements) {
this.statements = statements;
}
/*

View File

@ -1,283 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
/**
* @version $Revision: 1.4 $
*/
public class DefaultStatementProvider implements StatementProvider {
private String tablePrefix = "";
protected String messageTableName = "ACTIVEMQ_MSGS";
protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
protected String binaryDataType = "BLOB";
protected String containerNameDataType = "VARCHAR(250)";
protected String xidDataType = "VARCHAR(250)";
protected String msgIdDataType = "VARCHAR(250)";
protected String sequenceDataType = "INTEGER";
protected String longDataType = "BIGINT";
protected String stringIdDataType = "VARCHAR(250)";
protected boolean useExternalMessageReferences=false;
public String [] getCreateSchemaStatments() {
return new String[]{
"CREATE TABLE "+getFullMessageTableName()+"("
+"ID "+sequenceDataType+" NOT NULL"
+", CONTAINER "+containerNameDataType
+", MSGID_PROD "+msgIdDataType
+", MSGID_SEQ "+sequenceDataType
+", EXPIRATION "+longDataType
+", MSG "+(useExternalMessageReferences ? stringIdDataType : binaryDataType)
+", PRIMARY KEY ( ID ) )",
"CREATE INDEX "+getFullMessageTableName()+"_MIDX ON "+getFullMessageTableName()+" (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX "+getFullMessageTableName()+"_CIDX ON "+getFullMessageTableName()+" (CONTAINER)",
"CREATE INDEX "+getFullMessageTableName()+"_EIDX ON "+getFullMessageTableName()+" (EXPIRATION)",
"CREATE TABLE "+getTablePrefix()+durableSubAcksTableName+"("
+"CONTAINER "+containerNameDataType+" NOT NULL"
+", CLIENT_ID "+stringIdDataType+" NOT NULL"
+", SUB_NAME "+stringIdDataType+" NOT NULL"
+", SELECTOR "+stringIdDataType
+", LAST_ACKED_ID "+sequenceDataType
+", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
};
}
public String getFullMessageTableName() {
return getTablePrefix()+messageTableName;
}
public String [] getDropSchemaStatments() {
return new String[]{
"DROP TABLE "+getTablePrefix()+durableSubAcksTableName+"",
"DROP TABLE "+getFullMessageTableName()+"",
};
}
public String getAddMessageStatment() {
return "INSERT INTO "+getFullMessageTableName()+"(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
}
public String getUpdateMessageStatment() {
return "UPDATE "+getFullMessageTableName()+" SET MSG=? WHERE ID=?";
}
public String getRemoveMessageStatment() {
return "DELETE FROM "+getFullMessageTableName()+" WHERE ID=?";
}
public String getFindMessageSequenceIdStatment() {
return "SELECT ID FROM "+getFullMessageTableName()+" WHERE MSGID_PROD=? AND MSGID_SEQ=?";
}
public String getFindMessageStatment() {
return "SELECT MSG FROM "+getFullMessageTableName()+" WHERE ID=?";
}
public String getFindAllMessagesStatment() {
return "SELECT ID, MSG FROM "+getFullMessageTableName()+" WHERE CONTAINER=? ORDER BY ID";
}
public String getFindLastSequenceIdInMsgs() {
return "SELECT MAX(ID) FROM "+getFullMessageTableName();
}
public String getFindLastSequenceIdInAcks() {
return "SELECT MAX(LAST_ACKED_ID) FROM "+getTablePrefix()+durableSubAcksTableName;
}
public String getCreateDurableSubStatment() {
return "INSERT INTO "+getTablePrefix()+durableSubAcksTableName
+"(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) "
+"VALUES (?, ?, ?, ?, ?)";
}
public String getFindDurableSubStatment() {
return "SELECT SELECTOR, SUB_NAME " +
"FROM "+getTablePrefix()+durableSubAcksTableName+
" WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
public String getFindAllDurableSubsStatment() {
return "SELECT SELECTOR, SUB_NAME, CLIENT_ID" +
" FROM "+getTablePrefix()+durableSubAcksTableName+
" WHERE CONTAINER=?";
}
public String getUpdateLastAckOfDurableSub() {
return "UPDATE "+getTablePrefix()+durableSubAcksTableName+
" SET LAST_ACKED_ID=?" +
" WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
public String getDeleteSubscriptionStatment() {
return "DELETE FROM "+getTablePrefix()+durableSubAcksTableName+
" WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
public String getFindAllDurableSubMessagesStatment() {
return "SELECT M.ID, M.MSG FROM "
+getFullMessageTableName()+" M, "
+getTablePrefix()+durableSubAcksTableName +" D "
+" WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+" ORDER BY M.ID";
}
public String getFindAllDestinationsStatment() {
return "SELECT DISTINCT CONTAINER FROM "+getFullMessageTableName();
}
public String getRemoveAllMessagesStatment() {
return "DELETE FROM "+getFullMessageTableName()+" WHERE CONTAINER=?";
}
public String getRemoveAllSubscriptionsStatment() {
return "DELETE FROM "+getTablePrefix()+durableSubAcksTableName+" WHERE CONTAINER=?";
}
public String getDeleteOldMessagesStatment() {
return "DELETE FROM "+getFullMessageTableName()+
" WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " +
"( SELECT min("+getTablePrefix()+durableSubAcksTableName+".LAST_ACKED_ID) " +
"FROM "+getTablePrefix()+durableSubAcksTableName+" WHERE " +
getTablePrefix()+durableSubAcksTableName+".CONTAINER="+getFullMessageTableName()+
".CONTAINER)";
}
/**
* @return Returns the containerNameDataType.
*/
public String getContainerNameDataType() {
return containerNameDataType;
}
/**
* @param containerNameDataType The containerNameDataType to set.
*/
public void setContainerNameDataType(String containerNameDataType) {
this.containerNameDataType = containerNameDataType;
}
/**
* @return Returns the messageDataType.
*/
public String getBinaryDataType() {
return binaryDataType;
}
/**
* @param messageDataType The messageDataType to set.
*/
public void setBinaryDataType(String messageDataType) {
this.binaryDataType = messageDataType;
}
/**
* @return Returns the messageTableName.
*/
public String getMessageTableName() {
return messageTableName;
}
/**
* @param messageTableName The messageTableName to set.
*/
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
/**
* @return Returns the msgIdDataType.
*/
public String getMsgIdDataType() {
return msgIdDataType;
}
/**
* @param msgIdDataType The msgIdDataType to set.
*/
public void setMsgIdDataType(String msgIdDataType) {
this.msgIdDataType = msgIdDataType;
}
/**
* @return Returns the sequenceDataType.
*/
public String getSequenceDataType() {
return sequenceDataType;
}
/**
* @param sequenceDataType The sequenceDataType to set.
*/
public void setSequenceDataType(String sequenceDataType) {
this.sequenceDataType = sequenceDataType;
}
/**
* @return Returns the tablePrefix.
*/
public String getTablePrefix() {
return tablePrefix;
}
/**
* @param tablePrefix The tablePrefix to set.
*/
public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}
/**
* @return Returns the xidDataType.
*/
public String getXidDataType() {
return xidDataType;
}
/**
* @param xidDataType The xidDataType to set.
*/
public void setXidDataType(String xidDataType) {
this.xidDataType = xidDataType;
}
/**
* @return Returns the durableSubAcksTableName.
*/
public String getDurableSubAcksTableName() {
return durableSubAcksTableName;
}
/**
* @param durableSubAcksTableName The durableSubAcksTableName to set.
*/
public void setDurableSubAcksTableName(String durableSubAcksTableName) {
this.durableSubAcksTableName = durableSubAcksTableName;
}
public String getLongDataType() {
return longDataType;
}
public void setLongDataType(String longDataType) {
this.longDataType = longDataType;
}
public String getStringIdDataType() {
return stringIdDataType;
}
public void setStringIdDataType(String stringIdDataType) {
this.stringIdDataType = stringIdDataType;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences=useExternalMessageReferences;
}
public boolean isUseExternalMessageReferences() {
return useExternalMessageReferences;
}
}

View File

@ -16,27 +16,17 @@
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
*
* @version $Revision: 1.2 $
*/
public class HsqldbJDBCAdapter extends BytesJDBCAdapter {
public static class HSQLStatementProvider extends DefaultStatementProvider {
public HSQLStatementProvider() {
setBinaryDataType("OTHER");
}
}
public HsqldbJDBCAdapter() {
super(new HSQLStatementProvider());
public void setStatements(Statements statements) {
statements.setBinaryDataType("OTHER");
super.setStatements(statements);
}
public HsqldbJDBCAdapter(StatementProvider provider) {
super(provider);
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
* Provides JDBCAdapter since that uses
@ -28,22 +28,13 @@ import org.apache.activemq.store.jdbc.StatementProvider;
* <li>MS SQL</li>
* </ul>
*
* @org.apache.xbean.XBean element="imageBasedJDBCAdaptor"
*/
public class ImageBasedJDBCAdaptor extends DefaultJDBCAdapter {
public static StatementProvider createStatementProvider() {
DefaultStatementProvider answer = new DefaultStatementProvider();
answer.setBinaryDataType("IMAGE");
return answer;
}
public ImageBasedJDBCAdaptor() {
super(createStatementProvider());
}
public ImageBasedJDBCAdaptor(StatementProvider provider) {
super(provider);
}
public void setStatements(Statements statements) {
statements.setBinaryDataType("IMAGE");
super.setStatements(statements);
}
}

View File

@ -1,6 +1,6 @@
/**
*
* Copyright 2005 Pawel Tucholski
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,30 +17,24 @@
**/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
* JDBC Adapter for Informix database.
* Because Informix database restricts length of composite primary keys, length of
* <i>container name</i> field and <i>subscription id</i> field must be reducted to 150 characters.
* Therefore be sure not to use longer names for container name and subscription id than 150 characters.
* <i>container name</i> field and <i>subscription id</i> field must be reduced to 150 characters.
* Therefore be sure not to use longer names for container name and subscription id than 150 characters.
*
* @org.apache.xbean.XBean element="informixJDBCAdapter"
*/
public class InformixJDBCAdapter extends BlobJDBCAdapter {
public static StatementProvider createStatementProvider() {
DefaultStatementProvider answer = new DefaultStatementProvider();
answer.setContainerNameDataType("VARCHAR(150)");
answer.setStringIdDataType("VARCHAR(150)");
answer.setLongDataType("INT8");
answer.setBinaryDataType("BYTE");
return answer;
}
public InformixJDBCAdapter() {
this(createStatementProvider());
}
public InformixJDBCAdapter(StatementProvider provider) {
super(provider);
}
public void setStatements(Statements statements) {
statements.setContainerNameDataType("VARCHAR(150)");
statements.setStringIdDataType("VARCHAR(150)");
statements.setLongDataType("INT8");
statements.setBinaryDataType("BYTE");
super.setStatements(statements);
}
}

View File

@ -20,7 +20,7 @@ import java.sql.Blob;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
* Implements all the default JDBC operations that are used
@ -38,26 +38,18 @@ import org.apache.activemq.store.jdbc.StatementProvider;
* <li></li>
* </ul>
*
* @org.apache.xbean.XBean element="oracleJDBCAdapter"
*
* @version $Revision: 1.2 $
*/
public class OracleJDBCAdapter extends DefaultJDBCAdapter {
public static StatementProvider createStatementProvider() {
DefaultStatementProvider answer = new DefaultStatementProvider();
answer.setLongDataType("NUMBER");
return answer;
}
public OracleJDBCAdapter() {
this(createStatementProvider());
}
public OracleJDBCAdapter(StatementProvider provider) {
super(provider);
public void setStatements(Statements statements) {
statements.setLongDataType("NUMBER");
super.setStatements(statements);
}
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
// Get as a BLOB
Blob aBlob = rs.getBlob(1);
return aBlob.getBytes(1, (int) aBlob.length());

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.store.jdbc.adapter;
import org.apache.activemq.store.jdbc.StatementProvider;
import org.apache.activemq.store.jdbc.Statements;
/**
* Implements all the default JDBC operations that are used
@ -38,17 +38,9 @@ import org.apache.activemq.store.jdbc.StatementProvider;
*/
public class PostgresqlJDBCAdapter extends BytesJDBCAdapter {
public static StatementProvider createStatementProvider() {
DefaultStatementProvider answer = new DefaultStatementProvider();
answer.setBinaryDataType("BYTEA");
return answer;
}
public PostgresqlJDBCAdapter() {
this(createStatementProvider());
}
public PostgresqlJDBCAdapter(StatementProvider provider) {
super(provider);
public void setStatements(Statements statements) {
statements.setBinaryDataType("BYTEA");
super.setStatements(statements);
}
}

View File

@ -24,7 +24,6 @@ import java.sql.SQLException;
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.store.jdbc.StatementProvider;
/**
* This JDBCAdapter inserts and extracts BLOB data using the
@ -35,18 +34,12 @@ import org.apache.activemq.store.jdbc.StatementProvider;
* <li>Axion</li>
* </ul>
*
* @org.apache.xbean.XBean element="streamJDBCAdapter"
*
* @version $Revision: 1.2 $
*/
public class StreamJDBCAdapter extends DefaultJDBCAdapter {
public StreamJDBCAdapter() {
super();
}
public StreamJDBCAdapter(StatementProvider provider) {
super(provider);
}
/**
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, int)
*/

View File

@ -0,0 +1,63 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.xbean;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
/**
*
* @version $Revision: 1.1 $
*/
public class JDBCPersistenceXBeanConfigTest extends TestCase {
protected BrokerService brokerService;
public void testManagmentContextConfiguredCorrectly() throws Throwable {
PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
assertNotNull(persistenceAdapter);
assertTrue(persistenceAdapter instanceof JDBCPersistenceAdapter);
JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) persistenceAdapter;
assertEquals("BROKER1.", jpa.getStatements().getTablePrefix());
}
protected void setUp() throws Exception {
brokerService = createBroker();
brokerService.start();
}
protected void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
protected BrokerService createBroker() throws Exception {
String uri = "org/apache/activemq/xbean/jdbc-persistence-test.xml";
return BrokerFactory.createBroker(new URI("xbean:"+uri));
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans xmlns="http://activemq.org/config/1.0">
<broker useJmx="false">
<persistenceAdapter>
<journaledJDBC useJournal="false">
<statements>
<statements tablePrefix="BROKER1."/>
</statements>
</journaledJDBC>
</persistenceAdapter>
</broker>
</beans>
<!-- END SNIPPET: xbean -->