diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index cfbea6cbd4..e883b08f85 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -39,6 +39,8 @@ public interface JDBCAdapter { void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException; + + byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException; String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException; @@ -66,7 +68,7 @@ public interface JDBCAdapter { void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException; - long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException; + long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException; Set doGetDestinations(TransactionContext c) throws SQLException, IOException; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 54ef632228..25ea7522b4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -35,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; @@ -44,6 +45,7 @@ import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; import org.apache.activemq.store.memory.MemoryTransactionStore; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; @@ -223,7 +225,14 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist // Get a connection and insert the message into the DB. TransactionContext c = getTransactionContext(); try { - return getAdapter().doGetLastMessageBrokerSequenceId(c); + long seq = getAdapter().doGetLastMessageStoreSequenceId(c); + sequenceGenerator.setLastSequenceId(seq); + long brokerSeq = 0; + if (seq != 0) { + Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq))); + brokerSeq = last.getMessageId().getBrokerSequenceId(); + } + return brokerSeq; } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 6cba7f5a7f..7581b0a14c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -41,6 +41,7 @@ public class Statements { private String removeMessageStatment; private String findMessageSequenceIdStatement; private String findMessageStatement; + private String findMessageByIdStatement; private String findAllMessagesStatement; private String findLastSequenceIdInMsgsStatement; private String findLastSequenceIdInAcksStatement; @@ -139,6 +140,13 @@ public class Statements { return findMessageStatement; } + public String getFindMessageByIdStatement() { + if (findMessageStatement == null) { + findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?"; + } + return findMessageStatement; + } + public String getFindAllMessagesStatement() { if (findAllMessagesStatement == null) { findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index b7e5085e6c..7f878c8997 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -139,7 +139,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException { + public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { @@ -163,6 +163,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter { close(s); } } + + public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.getConnection().prepareStatement( + this.statements.getFindMessageByIdStatement()); + s.setLong(1, storeSequenceId); + rs = s.executeQuery(); + if (!rs.next()) { + return null; + } + return getBinaryData(rs, 1); + } finally { + close(rs); + close(s); + } + } + public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException { @@ -794,4 +813,5 @@ public class DefaultJDBCAdapter implements JDBCAdapter { * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {} * try { s.close(); } catch (Throwable ignore) {} } } */ + }