https://issues.apache.org/jira/browse/AMQ-3289 - updates to Blob/Oracle JDBCAdapter - applied patch from William McDonald with thanks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1173605 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-09-21 12:45:48 +00:00
parent e6a9ae2f31
commit ad6b5e29db
2 changed files with 17 additions and 12 deletions

View File

@ -26,6 +26,7 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.sql.rowset.serial.SerialBlob;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
@ -58,7 +59,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
ResultSet rs = null; ResultSet rs = null;
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
// Add the Blob record. // Add the Blob record.
s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
s.setLong(1, sequence); s.setLong(1, sequence);
@ -67,7 +67,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.setString(4, destination.getQualifiedName()); s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration); s.setLong(5, expiration);
s.setLong(6, priority); s.setLong(6, priority);
s.setString(7, " ");
if (s.executeUpdate() != 1) { if (s.executeUpdate() != 1) {
throw new IOException("Failed to add broker message: " + messageID + " in container."); throw new IOException("Failed to add broker message: " + messageID + " in container.");
@ -75,7 +74,8 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.close(); s.close();
// Select the blob record so that we can update it. // Select the blob record so that we can update it.
s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement()); s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
s.setLong(1, sequence); s.setLong(1, sequence);
rs = s.executeQuery(); rs = s.executeQuery();
if (!rs.next()) { if (!rs.next()) {
@ -84,15 +84,10 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
// Update the blob // Update the blob
Blob blob = rs.getBlob(1); Blob blob = rs.getBlob(1);
OutputStream stream = blob.setBinaryStream(data.length); blob.truncate(0);
stream.write(data); blob.setBytes(1, data);
stream.close(); rs.updateBlob(1, blob);
s.close(); rs.updateRow(); // Update the row with the updated blob
// Update the row with the updated blob
s = c.getConnection().prepareStatement(statements.getUpdateMessageStatement());
s.setBlob(1, blob);
s.setLong(2, sequence);
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();

View File

@ -48,6 +48,16 @@ public class OracleJDBCAdapter extends BlobJDBCAdapter {
public void setStatements(Statements statements) { public void setStatements(Statements statements) {
statements.setLongDataType("NUMBER"); statements.setLongDataType("NUMBER");
statements.setSequenceDataType("NUMBER"); statements.setSequenceDataType("NUMBER");
String addMessageStatement = "INSERT INTO "
+ statements.getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob())";
statements.setAddMessageStatement(addMessageStatement);
String findMessageByIdStatement = "SELECT MSG FROM " +
statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
statements.setFindMessageByIdStatement(findMessageByIdStatement);
super.setStatements(statements); super.setStatements(statements);
} }