From ad6b5e29db437b2c09309373a72d4dd605aea1f0 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 21 Sep 2011 12:45:48 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3289 - updates to Blob/Oracle JDBCAdapter - applied patch from William McDonald with thanks git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1173605 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/jdbc/adapter/BlobJDBCAdapter.java | 19 +++++++------------ .../store/jdbc/adapter/OracleJDBCAdapter.java | 10 ++++++++++ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java index c59cca0726..882c4d879e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java @@ -26,6 +26,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import javax.jms.JMSException; +import javax.sql.rowset.serial.SerialBlob; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; @@ -58,7 +59,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter { ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { - // Add the Blob record. s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); s.setLong(1, sequence); @@ -67,7 +67,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter { s.setString(4, destination.getQualifiedName()); s.setLong(5, expiration); s.setLong(6, priority); - s.setString(7, " "); if (s.executeUpdate() != 1) { throw new IOException("Failed to add broker message: " + messageID + " in container."); @@ -75,7 +74,8 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter { s.close(); // Select the blob record so that we can update it. - s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement()); + s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); s.setLong(1, sequence); rs = s.executeQuery(); if (!rs.next()) { @@ -84,15 +84,10 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter { // Update the blob Blob blob = rs.getBlob(1); - OutputStream stream = blob.setBinaryStream(data.length); - stream.write(data); - stream.close(); - s.close(); - - // Update the row with the updated blob - s = c.getConnection().prepareStatement(statements.getUpdateMessageStatement()); - s.setBlob(1, blob); - s.setLong(2, sequence); + blob.truncate(0); + blob.setBytes(1, data); + rs.updateBlob(1, blob); + rs.updateRow(); // Update the row with the updated blob } finally { cleanupExclusiveLock.readLock().unlock(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java index 6b21dc2f1f..fba3b89202 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java @@ -48,6 +48,16 @@ public class OracleJDBCAdapter extends BlobJDBCAdapter { public void setStatements(Statements statements) { statements.setLongDataType("NUMBER"); statements.setSequenceDataType("NUMBER"); + + String addMessageStatement = "INSERT INTO " + + statements.getFullMessageTableName() + + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob())"; + statements.setAddMessageStatement(addMessageStatement); + + String findMessageByIdStatement = "SELECT MSG FROM " + + statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE"; + statements.setFindMessageByIdStatement(findMessageByIdStatement); + super.setStatements(statements); }