merging 924752 - https://issues.apache.org/activemq/browse/AMQ-2594 - proper init of broker and store seq

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@924753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-03-18 13:07:20 +00:00
parent 9ec58511aa
commit 535f1f52a3
4 changed files with 42 additions and 3 deletions

View File

@ -40,6 +40,8 @@ public interface JDBCAdapter {
byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException;
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
@ -66,7 +68,7 @@ public interface JDBCAdapter {
void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException;

View File

@ -35,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
@ -44,6 +45,7 @@ import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
@ -223,7 +225,14 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
// Get a connection and insert the message into the DB.
TransactionContext c = getTransactionContext();
try {
return getAdapter().doGetLastMessageBrokerSequenceId(c);
long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
sequenceGenerator.setLastSequenceId(seq);
long brokerSeq = 0;
if (seq != 0) {
Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
brokerSeq = last.getMessageId().getBrokerSequenceId();
}
return brokerSeq;
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);

View File

@ -41,6 +41,7 @@ public class Statements {
private String removeMessageStatment;
private String findMessageSequenceIdStatement;
private String findMessageStatement;
private String findMessageByIdStatement;
private String findAllMessagesStatement;
private String findLastSequenceIdInMsgsStatement;
private String findLastSequenceIdInAcksStatement;
@ -139,6 +140,13 @@ public class Statements {
return findMessageStatement;
}
public String getFindMessageByIdStatement() {
if (findMessageStatement == null) {
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?";
}
return findMessageStatement;
}
public String getFindAllMessagesStatement() {
if (findAllMessagesStatement == null) {
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()

View File

@ -138,7 +138,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException {
public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
@ -163,6 +163,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(
this.statements.getFindMessageByIdStatement());
s.setLong(1, storeSequenceId);
rs = s.executeQuery();
if (!rs.next()) {
return null;
}
return getBinaryData(rs, 1);
} finally {
close(rs);
close(s);
}
}
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
@ -793,4 +812,5 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
* try { s.close(); } catch (Throwable ignore) {} } }
*/
}