git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@729803 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-12-28 23:13:32 +00:00
parent dfdd8963ad
commit 3b374365c1
2 changed files with 160 additions and 143 deletions

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -32,13 +30,9 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisoryBroker;
@ -83,7 +77,6 @@ import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;

View File

@ -23,7 +23,10 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -36,12 +39,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* Implements all the default JDBC operations that are used by the * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
* JDBCPersistenceAdapter. <p/> sub-classing is encouraged to override the * encouraged to override the default implementation of methods to account for differences in JDBC Driver
* default implementation of methods to account for differences in JDBC Driver * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
* implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using * The databases/JDBC drivers that use this adapter are:
* the getBytes()/setBytes() operations. <p/> The databases/JDBC drivers that
* use this adapter are:
* <ul> * <ul>
* <li></li> * <li></li>
* </ul> * </ul>
@ -51,10 +52,10 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.10 $ * @version $Revision: 1.10 $
*/ */
public class DefaultJDBCAdapter implements JDBCAdapter { public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements; protected Statements statements;
protected boolean batchStatments = true; protected boolean batchStatments = true;
private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data); s.setBytes(index, data);
@ -75,16 +76,15 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
boolean alreadyExists = false; boolean alreadyExists = false;
ResultSet rs = null; ResultSet rs = null;
try { try {
rs = c.getConnection().getMetaData().getTables(null, null, rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
statements.getFullMessageTableName(), new String[] { "TABLE" });
new String[] {"TABLE"});
alreadyExists = rs.next(); alreadyExists = rs.next();
} catch (Throwable ignore) { } catch (Throwable ignore) {
} finally { } finally {
close(rs); close(rs);
} }
s = c.getConnection().createStatement(); s = c.getConnection().createStatement();
String[] createStatments = statements.getCreateSchemaStatements(); String[] createStatments = this.statements.getCreateSchemaStatements();
for (int i = 0; i < createStatments.length; i++) { for (int i = 0; i < createStatments.length; i++) {
// This will fail usually since the tables will be // This will fail usually since the tables will be
// created already. // created already.
@ -93,13 +93,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.execute(createStatments[i]); s.execute(createStatments[i]);
} catch (SQLException e) { } catch (SQLException e) {
if (alreadyExists) { if (alreadyExists) {
LOG.debug("Could not create JDBC tables; The message table already existed." LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
+ " Failure was: " + createStatments[i] + " Message: " + e.getMessage() + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode()); + " Vendor code: " + e.getErrorCode());
} else { } else {
LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
+ createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ e.getSQLState() + " Vendor code: " + e.getErrorCode()); + " Vendor code: " + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e); JDBCPersistenceAdapter.log("Failure details: ", e);
} }
} }
@ -117,7 +117,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
Statement s = null; Statement s = null;
try { try {
s = c.getConnection().createStatement(); s = c.getConnection().createStatement();
String[] dropStatments = statements.getDropSchemaStatements(); String[] dropStatments = this.statements.getDropSchemaStatements();
for (int i = 0; i < dropStatments.length; i++) { for (int i = 0; i < dropStatments.length; i++) {
// This will fail usually since the tables will be // This will fail usually since the tables will be
// created already. // created already.
@ -125,9 +125,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
LOG.debug("Executing SQL: " + dropStatments[i]); LOG.debug("Executing SQL: " + dropStatments[i]);
s.execute(dropStatments[i]); s.execute(dropStatments[i]);
} catch (SQLException e) { } catch (SQLException e) {
LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
+ dropStatments[i] + " Message: " + e.getMessage() + " SQLState: " + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
+ e.getSQLState() + " Vendor code: " + e.getErrorCode()); + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e); JDBCPersistenceAdapter.log("Failure details: ", e);
} }
} }
@ -144,7 +144,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement()); s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
rs = s.executeQuery(); rs = s.executeQuery();
long seq1 = 0; long seq1 = 0;
if (rs.next()) { if (rs.next()) {
@ -152,7 +152,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
rs.close(); rs.close();
s.close(); s.close();
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement()); s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
rs = s.executeQuery(); rs = s.executeQuery();
long seq2 = 0; long seq2 = 0;
if (rs.next()) { if (rs.next()) {
@ -165,13 +165,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
byte[] data, long expiration) throws SQLException, IOException { long expiration) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement(); PreparedStatement s = c.getAddMessageStatement();
try { try {
if (s == null) { if (s == null) {
s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
if (batchStatments) { if (this.batchStatments) {
c.setAddMessageStatement(s); c.setAddMessageStatement(s);
} }
} }
@ -181,28 +181,27 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(4, destination.getQualifiedName()); s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration); s.setLong(5, expiration);
setBinaryData(s, 6, data); setBinaryData(s, 6, data);
if (batchStatments) { if (this.batchStatments) {
s.addBatch(); s.addBatch();
} else if (s.executeUpdate() != 1) { } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message"); throw new SQLException("Failed add a message");
} }
} finally { } finally {
if (!batchStatments) { if (!this.batchStatments) {
if (s!=null) { if (s != null) {
s.close(); s.close();
} }
} }
} }
} }
public void doAddMessageReference(TransactionContext c, MessageId messageID, public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
ActiveMQDestination destination, long expirationTime, String messageRef) long expirationTime, String messageRef) throws SQLException, IOException {
throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement(); PreparedStatement s = c.getAddMessageStatement();
try { try {
if (s == null) { if (s == null) {
s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
if (batchStatments) { if (this.batchStatments) {
c.setAddMessageStatement(s); c.setAddMessageStatement(s);
} }
} }
@ -212,24 +211,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(4, destination.getQualifiedName()); s.setString(4, destination.getQualifiedName());
s.setLong(5, expirationTime); s.setLong(5, expirationTime);
s.setString(6, messageRef); s.setString(6, messageRef);
if (batchStatments) { if (this.batchStatments) {
s.addBatch(); s.addBatch();
} else if (s.executeUpdate() != 1) { } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message"); throw new SQLException("Failed add a message");
} }
} finally { } finally {
if (!batchStatments) { if (!this.batchStatments) {
s.close(); s.close();
} }
} }
} }
public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement()); s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString()); s.setString(1, messageID.getProducerId().toString());
s.setLong(2, messageID.getProducerSequenceId()); s.setLong(2, messageID.getProducerSequenceId());
rs = s.executeQuery(); rs = s.executeQuery();
@ -247,7 +245,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq); s.setLong(1, seq);
rs = s.executeQuery(); rs = s.executeQuery();
if (!rs.next()) { if (!rs.next()) {
@ -264,7 +262,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq); s.setLong(1, seq);
rs = s.executeQuery(); rs = s.executeQuery();
if (!rs.next()) { if (!rs.next()) {
@ -281,33 +279,33 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = c.getRemovedMessageStatement(); PreparedStatement s = c.getRemovedMessageStatement();
try { try {
if (s == null) { if (s == null) {
s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment()); s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment());
if (batchStatments) { if (this.batchStatments) {
c.setRemovedMessageStatement(s); c.setRemovedMessageStatement(s);
} }
} }
s.setLong(1, seq); s.setLong(1, seq);
if (batchStatments) { if (this.batchStatments) {
s.addBatch(); s.addBatch();
} else if (s.executeUpdate() != 1) { } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed to remove message"); throw new SQLException("Failed to remove message");
} }
} finally { } finally {
if (!batchStatments) { if (!this.batchStatments) {
s.close(); s.close();
} }
} }
} }
public void doRecover(TransactionContext c, ActiveMQDestination destination, public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
JDBCMessageRecoveryListener listener) throws Exception { throws Exception {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
rs = s.executeQuery(); rs = s.executeQuery();
if (statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next()) { while (rs.next()) {
if (!listener.recoverMessageReference(rs.getString(2))) { if (!listener.recoverMessageReference(rs.getString(2))) {
break; break;
@ -327,12 +325,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq) throws SQLException, IOException { String subscriptionName, long seq) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement(); PreparedStatement s = c.getUpdateLastAckStatement();
try { try {
if (s == null) { if (s == null) {
s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement()); s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
if (batchStatments) { if (this.batchStatments) {
c.setUpdateLastAckStatement(s); c.setUpdateLastAckStatement(s);
} }
} }
@ -340,32 +338,31 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(2, destination.getQualifiedName()); s.setString(2, destination.getQualifiedName());
s.setString(3, clientId); s.setString(3, clientId);
s.setString(4, subscriptionName); s.setString(4, subscriptionName);
if (batchStatments) { if (this.batchStatments) {
s.addBatch(); s.addBatch();
} else if (s.executeUpdate() != 1) { } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message"); throw new SQLException("Failed add a message");
} }
} finally { } finally {
if (!batchStatments) { if (!this.batchStatments) {
s.close(); s.close();
} }
} }
} }
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
throws Exception {
// dumpTables(c, // dumpTables(c,
// destination.getQualifiedName(),clientId,subscriptionName); // destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
rs = s.executeQuery(); rs = s.executeQuery();
if (statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next()) { while (rs.next()) {
if (!listener.recoverMessageReference(rs.getString(2))) { if (!listener.recoverMessageReference(rs.getString(2))) {
break; break;
@ -385,12 +382,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, int maxReturned, String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(maxReturned); s.setMaxRows(maxReturned);
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
@ -398,7 +394,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setLong(4, seq); s.setLong(4, seq);
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) { if (listener.recoverMessageReference(rs.getString(1))) {
count++; count++;
@ -422,13 +418,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) String clientId, String subscriptionName) throws SQLException, IOException {
throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
int result = 0; int result = 0;
try { try {
s = c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement()); s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -444,18 +439,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
/** /**
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, * @param c
* java.lang.Object, org.apache.activemq.service.SubscriptionInfo) * @param info
* @param retroactive
* @throws SQLException
* @throws IOException
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
* org.apache.activemq.service.SubscriptionInfo)
*/ */
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive) public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
throws SQLException, IOException { throws SQLException, IOException {
// dumpTables(c, destination.getQualifiedName(), clientId, // dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName); // subscriptionName);
PreparedStatement s = null; PreparedStatement s = null;
try { try {
long lastMessageId = -1; long lastMessageId = -1;
if (!retroactive) { if (!retroactive) {
s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement()); s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
ResultSet rs = null; ResultSet rs = null;
try { try {
rs = s.executeQuery(); rs = s.executeQuery();
@ -467,7 +467,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
close(s); close(s);
} }
} }
s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement()); s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
s.setString(1, info.getDestination().getQualifiedName()); s.setString(1, info.getDestination().getQualifiedName());
s.setString(2, info.getClientId()); s.setString(2, info.getClientId());
s.setString(3, info.getSubscriptionName()); s.setString(3, info.getSubscriptionName());
@ -483,12 +483,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) String clientId, String subscriptionName) throws SQLException, IOException {
throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement()); s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -501,8 +500,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
subscription.setClientId(clientId); subscription.setClientId(clientId);
subscription.setSubscriptionName(subscriptionName); subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1)); subscription.setSelector(rs.getString(1));
subscription.setSubscribedDestination(ActiveMQDestination subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE)); ActiveMQDestination.QUEUE_TYPE));
return subscription; return subscription;
} finally { } finally {
close(rs); close(rs);
@ -511,11 +510,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
throws SQLException, IOException { throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement()); s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
rs = s.executeQuery(); rs = s.executeQuery();
ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
@ -525,8 +524,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
subscription.setSelector(rs.getString(1)); subscription.setSelector(rs.getString(1));
subscription.setSubscriptionName(rs.getString(2)); subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3)); subscription.setClientId(rs.getString(3));
subscription.setSubscribedDestination(ActiveMQDestination subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
.createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE)); ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription); rc.add(subscription);
} }
return rc.toArray(new SubscriptionInfo[rc.size()]); return rc.toArray(new SubscriptionInfo[rc.size()]);
@ -536,15 +535,15 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
throws SQLException, IOException { IOException {
PreparedStatement s = null; PreparedStatement s = null;
try { try {
s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
s.setString(1, destinationName.getQualifiedName()); s.setString(1, destinationName.getQualifiedName());
s.executeUpdate(); s.executeUpdate();
s.close(); s.close();
s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement()); s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
s.setString(1, destinationName.getQualifiedName()); s.setString(1, destinationName.getQualifiedName());
s.executeUpdate(); s.executeUpdate();
} finally { } finally {
@ -553,10 +552,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName) throws SQLException, IOException { String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
try { try {
s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement()); s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -569,8 +568,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
try { try {
LOG.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement()); LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
s.setLong(1, System.currentTimeMillis()); s.setLong(1, System.currentTimeMillis());
int i = s.executeUpdate(); int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s)."); LOG.debug("Deleted " + i + " old message(s).");
@ -579,16 +578,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
ActiveMQDestination destination, String clientId, String clientId, String subscriberName) throws SQLException, IOException {
String subscriberName) throws SQLException,
IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
long result = -1; long result = -1;
try { try {
s = c.getConnection() s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
.prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriberName); s.setString(3, subscriberName);
@ -624,7 +620,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement()); s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
rs = s.executeQuery(); rs = s.executeQuery();
while (rs.next()) { while (rs.next()) {
rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
@ -636,34 +632,50 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
return rc; return rc;
} }
/**
* @return true if batchStements
*/
public boolean isBatchStatments() { public boolean isBatchStatments() {
return batchStatments; return this.batchStatments;
} }
/**
* @param batchStatments
*/
public void setBatchStatments(boolean batchStatments) { public void setBatchStatments(boolean batchStatments) {
this.batchStatments = batchStatments; this.batchStatments = batchStatments;
} }
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
statements.setUseExternalMessageReferences(useExternalMessageReferences); this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
} }
/**
* @return the statements
*/
public Statements getStatements() { public Statements getStatements() {
return statements; return this.statements;
} }
public void setStatements(Statements statements) { public void setStatements(Statements statements) {
this.statements = statements; this.statements = statements;
} }
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, /**
ActiveMQDestination destination, * @param c
String clientId, String subscriberName) * @param destination
throws SQLException, IOException { * @param clientId
* @param subscriberName
* @return
* @throws SQLException
* @throws IOException
*/
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriberName); s.setString(3, subscriberName);
@ -679,12 +691,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
IOException { IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
int result = 0; int result = 0;
try { try {
s = c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement()); s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
rs = s.executeQuery(); rs = s.executeQuery();
if (rs.next()) { if (rs.next()) {
@ -698,35 +710,56 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
long id = 0;
List<Long> cleanupIds = new ArrayList<Long>();
int index = 0;
try { try {
s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
s.setMaxRows(maxReturned); s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setLong(2, nextSeq); s.setLong(2, nextSeq - maxReturned);
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) { while (rs.next() && count < maxReturned) {
id = rs.getLong(1);
if (this.lastRecoveredMessagesIds.contains(id)) {
// this message was already recovered
cleanupIds.add(id);
continue;
}
if (listener.recoverMessageReference(rs.getString(1))) { if (listener.recoverMessageReference(rs.getString(1))) {
count++; count++;
this.lastRecoveredMessagesIds.add(id);
} else { } else {
LOG.debug("Stopped recover next messages"); LOG.debug("Stopped recover next messages");
break;
} }
} }
} else { } else {
while (rs.next() && count < maxReturned) { while (rs.next() && count < maxReturned) {
id = rs.getLong(1);
if (this.lastRecoveredMessagesIds.contains(id)) {
// this message was already recovered
cleanupIds.add(id);
continue;
}
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++; count++;
this.lastRecoveredMessagesIds.add(id);
} else { } else {
LOG.debug("Stopped recover next messages"); LOG.debug("Stopped recover next messages");
break;
} }
} }
} }
// not cleanup the list of recovered messages
index = 0;
Iterator<Long> it = cleanupIds.iterator();
while (it.hasNext() && index < count) {
this.lastRecoveredMessagesIds.remove(it.next());
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
@ -735,35 +768,26 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
/* /*
* Useful for debugging. public void dumpTables(Connection c, String * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
* destinationName, String clientId, String subscriptionName) throws * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
* SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
* printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
* PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
* FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
* D.CLIENT_ID=? AND D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID >
* D.LAST_ACKED_ID" +" ORDER BY M.ID"); s.setString(1,destinationName);
* s.setString(2,clientId); s.setString(3,subscriptionName);
* printQuery(s,System.out); } * printQuery(s,System.out); }
* *
* public void dumpTables(Connection c) throws SQLException { printQuery(c, * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
* "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
* ACTIVEMQ_ACKS", System.out); }
* *
* private void printQuery(Connection c, String query, PrintStream out) * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
* throws SQLException { printQuery(c.prepareStatement(query), out); } * printQuery(c.prepareStatement(query), out); }
* *
* private void printQuery(PreparedStatement s, PrintStream out) throws * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
* SQLException {
* *
* ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
* metaData = set.getMetaData(); for( int i=1; i<= * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
* metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
* out.print(metaData.getColumnName(i)+"||"); } out.println(); * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
* while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { * try { s.close(); } catch (Throwable ignore) {} } }
* if(i==1) out.print("|"); out.print(set.getString(i)+"|"); }
* out.println(); } } finally { try { set.close(); } catch (Throwable
* ignore) {} try { s.close(); } catch (Throwable ignore) {} } }
*/ */
} }