diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 6f575c2bde..d23adac5fc 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -112,11 +112,13 @@ public class Statements { "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType, "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)", - "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType, + "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + stringIdDataType, "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " DEFAULT 5 NOT NULL", - "ALTER TABLE " + getFullAckTableName() + " ADD XID " + binaryDataType, + "ALTER TABLE " + getFullAckTableName() + " ADD XID " + stringIdDataType, "ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(), "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)", + "CREATE INDEX " + getFullMessageTableName() + "_XIDX ON " + getFullMessageTableName() + " (XID)", + "CREATE INDEX " + getFullAckTableName() + "_XIDX ON " + getFullAckTableName() + " (XID)" }; } return createSchemaStatements; diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index af230a86fa..cabe99cb6b 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; @@ -46,6 +45,9 @@ import org.apache.activemq.util.DataByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static javax.xml.bind.DatatypeConverter.parseBase64Binary; +import static javax.xml.bind.DatatypeConverter.printBase64Binary; + /** * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter.
sub-classing is * encouraged to override the default implementation of methods to account for differences in JDBC Driver @@ -228,9 +230,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter { if (xid != null) { byte[] xidVal = xid.getEncodedXidBytes(); xidVal[0] = '+'; - setBinaryData(s, 8, xidVal); + String xidString = printBase64Binary(xidVal); + s.setString(8, xidString); } else { - setBinaryData(s, 8, null); + s.setString(8, null); } if (this.batchStatments) { s.addBatch(); @@ -247,6 +250,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } + + public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); @@ -356,7 +361,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } else { byte[] xidVal = xid.getEncodedXidBytes(); xidVal[0] = '-'; - setBinaryData(s, 1, xidVal); + String xidString = printBase64Binary(xidVal); + s.setString(1, xidString); s.setLong(2, seq); } if (this.batchStatments) { @@ -443,7 +449,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } if (xid != null) { byte[] xidVal = encodeXid(xid, seq, priority); - setBinaryData(s, 1, xidVal); + String xidString = printBase64Binary(xidVal); + s.setString(1, xidString); } else { s.setLong(1, seq); } @@ -480,7 +487,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } if (xid != null) { byte[] xidVal = encodeXid(xid, seq, priority); - setBinaryData(s, 1, xidVal); + String xidString = printBase64Binary(xidVal); + s.setString(1, xidString); } else { s.setLong(1, seq); } @@ -957,7 +965,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rs = s.executeQuery(); while (rs.next()) { long id = rs.getLong(1); - byte[] encodedXid = getBinaryData(rs, 2); + String encodedString = rs.getString(2); + byte[] encodedXid = parseBase64Binary(encodedString); if (encodedXid[0] == '+') { jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3)); } else { @@ -971,7 +980,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement()); rs = s.executeQuery(); while (rs.next()) { - byte[] encodedXid = getBinaryData(rs, 1); + String encodedString = rs.getString(1); + byte[] encodedXid = parseBase64Binary(encodedString); String destination = rs.getString(2); String subName = rs.getString(3); String subId = rs.getString(4); diff --git a/assembly/src/release/example/build.xml b/assembly/src/release/example/build.xml index d17c2d863f..d16586d86a 100755 --- a/assembly/src/release/example/build.xml +++ b/assembly/src/release/example/build.xml @@ -120,6 +120,7 @@ verbose - Used to print out more info; the default is true messageSize - The size of the message in 1-byte characters parallelThreads - The number of parallel threads + batch - Batch size for transactions and client acknowledgment (default 10) user - Connection username (if connecting to secured broker) password - Connection password (if connecting to secured broker) @@ -225,9 +226,9 @@