https://issues.apache.org/jira/browse/AMQ-4628 - first attempt at jdbc performance improvement; make xid column string and indexed

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1502206 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-07-11 12:33:33 +00:00
parent 25d774b6dd
commit 74ba0529d2
5 changed files with 116 additions and 16 deletions

View File

@ -112,11 +112,13 @@ public class Statements {
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + stringIdDataType,
"ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " DEFAULT 5 NOT NULL",
"ALTER TABLE " + getFullAckTableName() + " ADD XID " + binaryDataType,
"ALTER TABLE " + getFullAckTableName() + " ADD XID " + stringIdDataType,
"ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(),
"ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
"CREATE INDEX " + getFullMessageTableName() + "_XIDX ON " + getFullMessageTableName() + " (XID)",
"CREATE INDEX " + getFullAckTableName() + "_XIDX ON " + getFullAckTableName() + " (XID)"
};
}
return createSchemaStatements;

View File

@ -29,7 +29,6 @@ import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@ -46,6 +45,9 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
import static javax.xml.bind.DatatypeConverter.printBase64Binary;
/**
* Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
* encouraged to override the default implementation of methods to account for differences in JDBC Driver
@ -228,9 +230,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
if (xid != null) {
byte[] xidVal = xid.getEncodedXidBytes();
xidVal[0] = '+';
setBinaryData(s, 8, xidVal);
String xidString = printBase64Binary(xidVal);
s.setString(8, xidString);
} else {
setBinaryData(s, 8, null);
s.setString(8, null);
}
if (this.batchStatments) {
s.addBatch();
@ -247,6 +250,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
@ -356,7 +361,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} else {
byte[] xidVal = xid.getEncodedXidBytes();
xidVal[0] = '-';
setBinaryData(s, 1, xidVal);
String xidString = printBase64Binary(xidVal);
s.setString(1, xidString);
s.setLong(2, seq);
}
if (this.batchStatments) {
@ -443,7 +449,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
if (xid != null) {
byte[] xidVal = encodeXid(xid, seq, priority);
setBinaryData(s, 1, xidVal);
String xidString = printBase64Binary(xidVal);
s.setString(1, xidString);
} else {
s.setLong(1, seq);
}
@ -480,7 +487,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
if (xid != null) {
byte[] xidVal = encodeXid(xid, seq, priority);
setBinaryData(s, 1, xidVal);
String xidString = printBase64Binary(xidVal);
s.setString(1, xidString);
} else {
s.setLong(1, seq);
}
@ -957,7 +965,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery();
while (rs.next()) {
long id = rs.getLong(1);
byte[] encodedXid = getBinaryData(rs, 2);
String encodedString = rs.getString(2);
byte[] encodedXid = parseBase64Binary(encodedString);
if (encodedXid[0] == '+') {
jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
} else {
@ -971,7 +980,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
rs = s.executeQuery();
while (rs.next()) {
byte[] encodedXid = getBinaryData(rs, 1);
String encodedString = rs.getString(1);
byte[] encodedXid = parseBase64Binary(encodedString);
String destination = rs.getString(2);
String subName = rs.getString(3);
String subId = rs.getString(4);

View File

@ -120,6 +120,7 @@
verbose - Used to print out more info; the default is true
messageSize - The size of the message in 1-byte characters
parallelThreads - The number of parallel threads
batch - Batch size for transactions and client acknowledgment (default 10)
user - Connection username (if connecting to secured broker)
password - Connection password (if connecting to secured broker)
@ -249,6 +250,7 @@
<arg value="--sleep-time=${sleepTime}" />
<arg value="--transacted=${transacted}" />
<arg value="--verbose=${verbose}"/>
<arg value="--batch=${batch}"/>
<arg value="--user=${user}"/>
<arg value="--password=${password}"/>
</java>

View File

@ -51,6 +51,7 @@ public class ProducerTool extends Thread {
private boolean topic;
private boolean transacted;
private boolean persistent;
private long batch = 10;
private static Object lockResults = new Object();
public static void main(String[] args) {
@ -166,7 +167,7 @@ public class ProducerTool extends Thread {
producer.send(message);
if (transacted) {
if (transacted && (i % batch == 0)) {
System.out.println("[" + this.getName() + "] Committing " + messageCount + " messages");
session.commit();
}
@ -244,4 +245,8 @@ public class ProducerTool extends Thread {
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public void setBatch(long batch) {
this.batch = batch;
}
}

View File

@ -0,0 +1,81 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Use JDBC for message persistence
For more information, see:
http://activemq.apache.org/persistence.html
You need to add Derby database to your classpath in order to make this example work.
Download it from http://db.apache.org/derby/ and put it in the ${ACTIVEMQ_HOME}/lib/optional/ folder
Optionally you can configure any other RDBM as shown below
To run ActiveMQ with this configuration add xbean:conf/activemq-jdbc.xml to your command
e.g. $ bin/activemq console xbean:conf/activemq-jdbc.xml
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker useJmx="true" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" expireMessagesPeriod="0" prioritizedMessages="false">
</policyEntry>
<policyEntry queue=">" expireMessagesPeriod="0" prioritizedMessages="false">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
See more database locker options at http://activemq.apache.org/pluggable-storage-lockers.html
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds" cleanupPeriod="0" />
</persistenceAdapter>
<transportConnectors>
<transportConnector name="default" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>