https://issues.apache.org/jira/browse/AMQ-3289 - ActiveMQ has problems storing >4K messages in Oracle - brought BlobAdapter up to speed and added @Override to other adapters to catch this sort of problem in the future

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1095376 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-04-20 11:52:50 +00:00
parent fe31092b54
commit adc45e8801
15 changed files with 50 additions and 38 deletions

View File

@ -30,6 +30,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class AxionJDBCAdapter extends StreamJDBCAdapter {
@Override
public void setStatements(Statements statements) {
String[] createStatements = new String[]{

View File

@ -27,6 +27,8 @@ import java.sql.SQLException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -49,30 +51,34 @@ import org.apache.activemq.util.ByteArrayOutputStream;
*/
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
throws SQLException, JMSException {
@Override
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
// Add the Blob record.
s = c.prepareStatement(statements.getAddMessageStatement());
s.setLong(1, seq);
s.setString(2, destinationName);
s.setString(3, messageID);
s.setString(4, " ");
s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
s.setLong(1, sequence);
s.setString(2, messageID.getProducerId().toString());
s.setLong(3, messageID.getProducerSequenceId());
s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration);
s.setLong(6, priority);
if (s.executeUpdate() != 1) {
throw new JMSException("Failed to broker message: " + messageID + " in container.");
throw new IOException("Failed to add broker message: " + messageID + " in container.");
}
s.close();
// Select the blob record so that we can update it.
s = c.prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement());
s.setLong(1, sequence);
rs = s.executeQuery();
if (!rs.next()) {
throw new JMSException("Failed to broker message: " + messageID + " in container.");
throw new IOException("Failed select blob for message: " + messageID + " in container.");
}
// Update the blob
@ -83,31 +89,27 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.close();
// Update the row with the updated blob
s = c.prepareStatement(statements.getUpdateMessageStatement());
s = c.getConnection().prepareStatement(statements.getUpdateMessageStatement());
s.setBlob(1, blob);
s.setLong(2, seq);
s.setLong(2, sequence);
} catch (IOException e) {
throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
try {
rs.close();
} catch (Throwable ignore) {
}
try {
s.close();
} catch (Throwable ignore) {
}
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
}
public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
@Override
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
s.setLong(1, seq);
s.setString(1, id.getProducerId().toString());
s.setLong(2, id.getProducerSequenceId());
rs = s.executeQuery();
if (!rs.next()) {
@ -126,17 +128,10 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
return os.toByteArray();
} catch (IOException e) {
throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
try {
rs.close();
} catch (Throwable ignore) {
}
try {
s.close();
} catch (Throwable ignore) {
}
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
}

View File

@ -34,6 +34,7 @@ public class BytesJDBCAdapter extends DefaultJDBCAdapter {
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
* int)
*/
@Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
return rs.getBytes(index);
}
@ -42,6 +43,7 @@ public class BytesJDBCAdapter extends DefaultJDBCAdapter {
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
* int, byte[])
*/
@Override
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBytes(index, data);
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class DB2JDBCAdapter extends DefaultJDBCAdapter {
@Override
public void setStatements(Statements statements) {
String lockCreateStatement = "LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE";
statements.setLockCreateStatement(lockCreateStatement);
@ -35,6 +36,7 @@ public class DB2JDBCAdapter extends DefaultJDBCAdapter {
super.setStatements(statements);
}
@Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
// Get as a BLOB
Blob aBlob = rs.getBlob(index);

View File

@ -780,14 +780,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
return result;
}
private static void close(PreparedStatement s) {
protected static void close(PreparedStatement s) {
try {
s.close();
} catch (Throwable e) {
}
}
private static void close(ResultSet rs) {
protected static void close(ResultSet rs) {
try {
rs.close();
} catch (Throwable e) {

View File

@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class HsqldbJDBCAdapter extends BytesJDBCAdapter {
@Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("OTHER");
super.setStatements(statements);

View File

@ -32,6 +32,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class ImageBasedJDBCAdaptor extends DefaultJDBCAdapter {
@Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("IMAGE");
super.setStatements(statements);

View File

@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class InformixJDBCAdapter extends BlobJDBCAdapter {
@Override
public void setStatements(Statements statements) {
statements.setContainerNameDataType("VARCHAR(150)");
statements.setStringIdDataType("VARCHAR(150)");

View File

@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class MaxDBJDBCAdapter extends DefaultJDBCAdapter {
@Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("LONG BYTE");
statements.setStringIdDataType("VARCHAR(250) ASCII");

View File

@ -42,6 +42,7 @@ public class MySqlJDBCAdapter extends DefaultJDBCAdapter {
String engineType = INNODB;
String typeStatement = "ENGINE";
@Override
public void setStatements(Statements statements) {
String type = engineType.toUpperCase();
if( !type.equals(INNODB) && !type.equals(NDBCLUSTER) ) {

View File

@ -44,12 +44,14 @@ import org.apache.activemq.store.jdbc.Statements;
*/
public class OracleJDBCAdapter extends BlobJDBCAdapter {
@Override
public void setStatements(Statements statements) {
statements.setLongDataType("NUMBER");
statements.setSequenceDataType("NUMBER");
super.setStatements(statements);
}
@Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
// Get as a BLOB
Blob aBlob = rs.getBlob(index);

View File

@ -40,6 +40,7 @@ import org.apache.activemq.store.jdbc.Statements;
public class PostgresqlJDBCAdapter extends BytesJDBCAdapter {
public String acksPkName = "activemq_acks_pkey";
@Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("BYTEA");
statements.setDropAckPKAlterStatementEnd("DROP CONSTRAINT \"" + getAcksPkName() + "\"");

View File

@ -44,6 +44,7 @@ public class StreamJDBCAdapter extends DefaultJDBCAdapter {
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
* int)
*/
@Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
try {
@ -67,6 +68,7 @@ public class StreamJDBCAdapter extends DefaultJDBCAdapter {
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
* int, byte[])
*/
@Override
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBinaryStream(index, new ByteArrayInputStream(data), data.length);
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.store.jdbc.Statements;
*
*/
public class SybaseJDBCAdapter extends ImageBasedJDBCAdaptor {
@Override
public void setStatements(Statements statements) {
statements.setLockCreateStatement("LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE");

View File

@ -26,6 +26,7 @@ import org.apache.activemq.store.jdbc.Statements;
*
*/
public class TransactJDBCAdapter extends ImageBasedJDBCAdaptor {
@Override
public void setStatements(Statements statements) {
String lockCreateStatement = "SELECT * FROM " + statements.getFullLockTableName();