git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1492214 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-12 14:18:51 +00:00
parent 7db8987ca6
commit 0168a826f7
2 changed files with 200 additions and 42 deletions

View File

@ -32,8 +32,8 @@ import javax.sql.DataSource;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -42,7 +42,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
@ -62,22 +61,21 @@ import org.slf4j.LoggerFactory;
/**
* A {@link PersistenceAdapter} implementation using JDBC for persistence
* storage.
*
*
* This persistence adapter will correctly remember prepared XA transactions,
* but it will not keep track of local transaction commits so that operations
* performed against the Message store are done as a single uow.
*
*
* @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
*
*
*
*/
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/");
"META-INF/services/org/apache/activemq/store/jdbc/");
private static FactoryFinder lockFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/lock/");
"META-INF/services/org/apache/activemq/store/jdbc/lock/");
public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
@ -94,13 +92,13 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
private int transactionIsolation;
private File directory;
private boolean changeAutoCommitAllowed = true;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected boolean enableAudit=false;
protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit;
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
@ -116,6 +114,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.wireFormat = wireFormat;
}
@Override
public Set<ActiveMQDestination> getDestinations() {
TransactionContext c = null;
try {
@ -140,15 +139,16 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
private Set<ActiveMQDestination> emptyDestinationSet() {
return Collections.EMPTY_SET;
}
protected void createMessageAudit() {
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
@Override
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
@ -163,14 +163,15 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
}
}
}
}
public void initSequenceIdGenerator() {
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
@Override
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
@ -185,9 +186,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
}
}
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
@ -196,6 +197,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
return rc;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
@ -208,6 +210,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
* Cleanup method to remove any state associated with the given destination
* @param destination Destination to forget
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
try {
@ -237,9 +240,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
*
* @param destination Destination to forget
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
@Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new JdbcMemoryTransactionStore(this);
@ -247,6 +252,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
return this.transactionStore;
}
@Override
public long getLastMessageBrokerSequenceId() throws IOException {
TransactionContext c = getTransactionContext();
try {
@ -270,7 +276,8 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
c.close();
}
}
@Override
public long getLastProducerSequenceId(ProducerId id) throws IOException {
TransactionContext c = getTransactionContext();
try {
@ -303,6 +310,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
@Override
public void doStart() throws Exception {
if( brokerService!=null ) {
@ -312,6 +320,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
// Cleanup the db periodically.
if (cleanupPeriod > 0) {
cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
cleanup();
}
@ -320,6 +329,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
createMessageAudit();
}
@Override
public synchronized void doStop(ServiceStopper stopper) throws Exception {
if (cleanupTicket != null) {
cleanupTicket.cancel(true);
@ -353,9 +363,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.clockDaemon = clockDaemon;
}
@Override
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
if (clockDaemon == null) {
clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
thread.setDaemon(true);
@ -374,7 +386,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
/**
*
* @deprecated as of 5.7.0, replaced by {@link #getLocker()}
*/
@Deprecated
@ -388,6 +399,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
*
* @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
*/
@Deprecated
public void setDatabaseLocker(Locker locker) throws IOException {
setLocker(locker);
}
@ -405,7 +417,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
return lockDataSource;
}
public void setLockDataSource(DataSource dataSource) {
this.lockDataSource = dataSource;
}
@ -418,9 +430,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
* @throws IOException
*/
protected JDBCAdapter createAdapter() throws IOException {
adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
// Use the default JDBC adapter if the
// Database type is not recognized.
if (adapter == null) {
@ -492,16 +504,19 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
return answer;
}
@Override
public void beginTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.begin();
}
@Override
public void commitTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.commit();
}
@Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
TransactionContext transactionContext = getTransactionContext(context);
transactionContext.rollback();
@ -533,6 +548,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.changeAutoCommitAllowed = changeAutoCommitAllowed;
}
@Override
public void deleteAllMessages() throws IOException {
TransactionContext c = getTransactionContext();
try {
@ -596,15 +612,20 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
public void setStatements(Statements statements) {
this.statements = statements;
if (adapter != null) {
this.adapter.setStatements(getStatements());
}
}
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
@Override
public void setUsageManager(SystemUsage usageManager) {
}
@Override
public Locker createDefaultLocker() throws IOException {
Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
@ -615,17 +636,21 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
return locker;
}
@Override
public void setBrokerName(String brokerName) {
}
@Override
public String toString() {
return "JDBCPersistenceAdapter(" + super.toString() + ")";
}
@Override
public void setDirectory(File dir) {
this.directory=dir;
}
@Override
public File getDirectory(){
if (this.directory==null && brokerService != null){
this.directory=brokerService.getBrokerDataDirectory();
@ -634,6 +659,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
// interesting bit here is proof that DB is ok
@Override
public void checkpoint(boolean sync) throws IOException {
// by pass TransactionContext to avoid IO Exception handler
Connection connection = null;
@ -652,6 +678,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
@Override
public long size(){
return 0;
}
@ -663,10 +690,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
* not applied if DataBaseLocker is injected.
*
*/
@Deprecated
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
}
/**
* set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
* This allowable dirty isolation level may not be achievable in clustered DB environments
@ -678,29 +706,29 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.transactionIsolation = transactionIsolation;
}
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
public int getMaxAuditDepth() {
return maxAuditDepth;
}
public int getMaxAuditDepth() {
return maxAuditDepth;
}
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
public boolean isEnableAudit() {
return enableAudit;
}
public boolean isEnableAudit() {
return enableAudit;
}
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
public int getAuditRecoveryDepth() {
return auditRecoveryDepth;
@ -764,7 +792,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
@ -816,5 +843,4 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
return result;
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JDBCTablePrefixAssignedTest {
private static final Logger LOG = LoggerFactory.getLogger(JDBCTablePrefixAssignedTest.class);
private BrokerService service;
@Before
public void setUp() throws Exception {
service = createBroker();
service.start();
service.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
service.stop();
service.waitUntilStopped();
}
@Test
public void testTablesHave() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; ++i) {
producer.send(session.createTextMessage("test"));
}
producer.close();
connection.close();
List<Message> queuedMessages = null;
try {
queuedMessages = dumpMessages();
} catch (Exception ex) {
LOG.info("Caught ex: ", ex);
fail("Should not have thrown an exception");
}
assertNotNull(queuedMessages);
assertEquals("Should have found 10 messages", 10, queuedMessages.size());
}
protected List<Message> dumpMessages() throws Exception {
WireFormat wireFormat = new OpenWireFormat();
java.sql.Connection conn = ((JDBCPersistenceAdapter) service.getPersistenceAdapter()).getDataSource().getConnection();
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM MYPREFIX_ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
ArrayList<Message> results = new ArrayList<Message>();
while(result.next()) {
long id = result.getLong(1);
Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
results.add(message);
}
statement.close();
conn.close();
return results;
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
DefaultJDBCAdapter adapter = new DefaultJDBCAdapter();
jdbc.setAdapter(adapter);
Statements statements = new Statements();
statements.setTablePrefix("MYPREFIX_");
jdbc.setStatements(statements);
jdbc.setUseLock(false);
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);
return broker;
}
}