mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2594 - proper init of broker and store seq
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@924752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
229043b7d9
commit
1cec28cf27
|
@ -39,6 +39,8 @@ public interface JDBCAdapter {
|
||||||
void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
|
void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
|
||||||
|
|
||||||
byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
|
byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
|
||||||
|
|
||||||
|
byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException;
|
||||||
|
|
||||||
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
|
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
|
||||||
|
|
||||||
|
@ -66,7 +68,7 @@ public interface JDBCAdapter {
|
||||||
|
|
||||||
void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
|
void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
|
||||||
|
|
||||||
long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
|
long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
|
||||||
|
|
||||||
Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException;
|
Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException;
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
@ -44,6 +45,7 @@ import org.apache.activemq.store.TransactionStore;
|
||||||
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
||||||
import org.apache.activemq.store.memory.MemoryTransactionStore;
|
import org.apache.activemq.store.memory.MemoryTransactionStore;
|
||||||
import org.apache.activemq.usage.SystemUsage;
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
import org.apache.activemq.util.FactoryFinder;
|
import org.apache.activemq.util.FactoryFinder;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
@ -223,7 +225,14 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
// Get a connection and insert the message into the DB.
|
// Get a connection and insert the message into the DB.
|
||||||
TransactionContext c = getTransactionContext();
|
TransactionContext c = getTransactionContext();
|
||||||
try {
|
try {
|
||||||
return getAdapter().doGetLastMessageBrokerSequenceId(c);
|
long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
|
||||||
|
sequenceGenerator.setLastSequenceId(seq);
|
||||||
|
long brokerSeq = 0;
|
||||||
|
if (seq != 0) {
|
||||||
|
Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
|
||||||
|
brokerSeq = last.getMessageId().getBrokerSequenceId();
|
||||||
|
}
|
||||||
|
return brokerSeq;
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||||
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
|
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class Statements {
|
||||||
private String removeMessageStatment;
|
private String removeMessageStatment;
|
||||||
private String findMessageSequenceIdStatement;
|
private String findMessageSequenceIdStatement;
|
||||||
private String findMessageStatement;
|
private String findMessageStatement;
|
||||||
|
private String findMessageByIdStatement;
|
||||||
private String findAllMessagesStatement;
|
private String findAllMessagesStatement;
|
||||||
private String findLastSequenceIdInMsgsStatement;
|
private String findLastSequenceIdInMsgsStatement;
|
||||||
private String findLastSequenceIdInAcksStatement;
|
private String findLastSequenceIdInAcksStatement;
|
||||||
|
@ -139,6 +140,13 @@ public class Statements {
|
||||||
return findMessageStatement;
|
return findMessageStatement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getFindMessageByIdStatement() {
|
||||||
|
if (findMessageStatement == null) {
|
||||||
|
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?";
|
||||||
|
}
|
||||||
|
return findMessageStatement;
|
||||||
|
}
|
||||||
|
|
||||||
public String getFindAllMessagesStatement() {
|
public String getFindAllMessagesStatement() {
|
||||||
if (findAllMessagesStatement == null) {
|
if (findAllMessagesStatement == null) {
|
||||||
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
|
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException {
|
public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
|
||||||
PreparedStatement s = null;
|
PreparedStatement s = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
|
@ -163,6 +163,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
close(s);
|
close(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
|
||||||
|
PreparedStatement s = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
s = c.getConnection().prepareStatement(
|
||||||
|
this.statements.getFindMessageByIdStatement());
|
||||||
|
s.setLong(1, storeSequenceId);
|
||||||
|
rs = s.executeQuery();
|
||||||
|
if (!rs.next()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return getBinaryData(rs, 1);
|
||||||
|
} finally {
|
||||||
|
close(rs);
|
||||||
|
close(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
|
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
|
||||||
long expiration) throws SQLException, IOException {
|
long expiration) throws SQLException, IOException {
|
||||||
|
@ -794,4 +813,5 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
|
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
|
||||||
* try { s.close(); } catch (Throwable ignore) {} } }
|
* try { s.close(); } catch (Throwable ignore) {} } }
|
||||||
*/
|
*/
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue