OPENJPA-464 committing patch provided by Teresa Kan

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@614731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Dick 2008-01-23 23:25:56 +00:00
parent 72aefb149b
commit 8d18daabd7
10 changed files with 567 additions and 4 deletions

View File

@ -24,6 +24,7 @@ import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.openjpa.conf.OpenJPAConfigurationImpl;
import org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager;
import org.apache.openjpa.jdbc.kernel.EagerFetchModes;
import org.apache.openjpa.jdbc.kernel.JDBCBrokerFactory;
import org.apache.openjpa.jdbc.kernel.LRSSizes;
@ -214,11 +215,13 @@ public class JDBCConfigurationImpl
updateManagerPlugin = addPlugin("jdbc.UpdateManager", true);
aliases = new String[]{
"default",
"org.apache.openjpa.jdbc.kernel.ConstraintUpdateManager",
BatchingConstraintUpdateManager.class.getName(),
"operation-order",
"org.apache.openjpa.jdbc.kernel.OperationOrderUpdateManager",
"constraint",
"org.apache.openjpa.jdbc.kernel.ConstraintUpdateManager",
"batching-constraint",
BatchingConstraintUpdateManager.class.getName(),
};
updateManagerPlugin.setAliases(aliases);
updateManagerPlugin.setDefault(aliases[0]);

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.jdbc.kernel;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.openjpa.jdbc.schema.ForeignKey;
import org.apache.openjpa.jdbc.sql.PrimaryRow;
import org.apache.openjpa.jdbc.sql.Row;
import org.apache.openjpa.jdbc.sql.RowImpl;
import org.apache.openjpa.jdbc.sql.RowManager;
import org.apache.openjpa.jdbc.sql.RowManagerImpl;
import org.apache.openjpa.jdbc.sql.SQLExceptions;
import org.apache.openjpa.kernel.OpenJPAStateManager;
/**
* <P>Batch update manager that writes the SQL in object-level operation order.
* This update manager initiates a BatchPreparedStatementManagerImpl which
* will utilize the JDBC addBatch() and executeBatch() APIs to batch the
* statements for performance improvement.</P>
* <P>This is the default plug-in class for UpdateManager to support statement
* batching. You can plug-in your own statement batch implementation through
* the following property:
* <PRE>
* < property name="openjpa.jdbc.UpdateManager"
* value="org.apache.openjpa.jdbc.kernel.YourOperationOrderUpdateManager" />
* </PRE></P>
* @author Teresa Kan
*/
public class BatchingConstraintUpdateManager extends ConstraintUpdateManager {
protected PreparedStatementManager newPreparedStatementManager(
JDBCStore store, Connection conn) {
int batchLimit = dict.getBatchLimit();
return new BatchingPreparedStatementManagerImpl(store, conn, batchLimit);
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.jdbc.kernel;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.meta.ClassMapping;
import org.apache.openjpa.jdbc.schema.Column;
import org.apache.openjpa.jdbc.sql.Row;
import org.apache.openjpa.jdbc.sql.RowImpl;
import org.apache.openjpa.jdbc.sql.SQLExceptions;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.ApplicationIds;
import org.apache.openjpa.util.OptimisticException;
/**
* Batch prepared statement manager implementation. This prepared statement
* manager will utilize the JDBC addBatch() and exceuteBatch() to batch the SQL
* statements together to improve the execution performance.
*
* @author Teresa Kan
*/
public class BatchingPreparedStatementManagerImpl extends
PreparedStatementManagerImpl {
private final static Localizer _loc = Localizer
.forPackage(BatchingPreparedStatementManagerImpl.class);
private Map _cacheSql = null;
private int _batchLimit;
private boolean _disableBatch = false;
private transient Log _log = null;
/**
* Constructor. Supply connection.
*/
public BatchingPreparedStatementManagerImpl(JDBCStore store,
Connection conn, int batchLimit) {
super(store, conn);
_batchLimit = batchLimit;
_log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC);
if (_log.isTraceEnabled())
_log.trace(_loc.get("batch_limit", String.valueOf(_batchLimit)));
}
/**
* Flush the given row. This method will cache the statement in a cache. The
* statement will be executed in the flush() method.
*/
protected void flushInternal(RowImpl row) throws SQLException {
if (_batchLimit == 0 || _disableBatch) {
super.flushInternal(row);
return;
}
Column[] autoAssign = null;
if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns();
// prepare statement
String sql = row.getSQL(_dict);
OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping cmd = null;
if (sm != null)
cmd = (ClassMapping) sm.getMetaData();
// validate batch capability
_disableBatch = _dict.validateBatchProcess(row, autoAssign, sm, cmd);
// process the sql statement, either execute it immediately or
// cache them.
processSql(sql, row);
// set auto assign values
if (autoAssign != null && autoAssign.length > 0 && sm != null) {
Object val;
for (int i = 0; i < autoAssign.length; i++) {
val = _dict.getGeneratedKey(autoAssign[i], _conn);
cmd.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
_store, autoAssign[i], val);
}
sm.setObjectId(ApplicationIds.create(sm.getPersistenceCapable(),
cmd));
}
}
private void processSql(String sql, RowImpl row) throws SQLException {
ArrayList temprow;
if (_cacheSql == null)
_cacheSql = Collections.synchronizedMap(new LinkedHashMap());
if (_disableBatch) {
// if there were some statements batched before, then
// we need to flush them out first before processing the
// current non batch process.
if (!_cacheSql.isEmpty())
flush();
execute(sql, row);
} else {
// else start batch support. If the sql string is in the cache,
// just adds the row to the cache
if (_cacheSql.containsKey(sql)) {
temprow = (ArrayList) _cacheSql.get(sql);
temprow.add(row);
_cacheSql.put(sql, temprow);
} else {
// no sql exists in the cache, cache the sql string and its rows
ArrayList inputrow = new ArrayList();
inputrow.add(row);
_cacheSql.put(sql, inputrow);
}
} // end of batch support
}
private void execute(String sql, RowImpl row) throws SQLException {
PreparedStatement stmnt = null;
try {
ResultSet rs = null;
stmnt = _conn.prepareStatement(sql);
row.flush(stmnt, _dict, _store);
int count = stmnt.executeUpdate();
if (count != 1) {
Object failed = row.getFailedObject();
if (failed != null)
_exceptions.add(new OptimisticException(failed));
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count), sql).getMessage());
}
} catch (SQLException se) {
throw SQLExceptions.getStore(se, row.getFailedObject(), _dict);
} finally {
try {
if (stmnt != null)
stmnt.close();
} catch (SQLException se) {
// ignore the exception for this case.
}
}
}
public void flush() {
PreparedStatement ps = null;
ArrayList list;
RowImpl onerow = null;
// go thru the cache to process all the sql stmt.
if (_cacheSql == null || _cacheSql.isEmpty()) {
super.flush();
return;
}
Set e = _cacheSql.keySet();
for (Iterator itr = e.iterator(); itr.hasNext();) {
String key = (String) itr.next();
try {
ps = _conn.prepareStatement(key);
} catch (SQLException se) {
throw SQLExceptions.getStore(se, ps, _dict);
}
list = (ArrayList) _cacheSql.get(key);
if (list == null) {
return;
}
// if only 1 row for this statement, then execute it right away
int rowsize = list.size();
try {
if (rowsize == 1) {
onerow = (RowImpl) list.get(0);
onerow.flush(ps, _dict, _store);
int count = ps.executeUpdate();
if (count != 1) {
Object failed = onerow.getFailedObject();
if (failed != null)
_exceptions.add(new OptimisticException(failed));
else if (onerow.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count), key).getMessage());
}
} else {
// has more than one rows for this statement, use addBatch
int count = 0;
for (int i = 0; i < list.size(); i++) {
onerow = (RowImpl) list.get(i);
if (count < _batchLimit || _batchLimit == -1) {
onerow.flush(ps, _dict, _store);
ps.addBatch();
count++;
} else {
// reach the batchLimit , execute it
try {
int[] rtn = ps.executeBatch();
checkUpdateCount(rtn, onerow, key);
} catch (BatchUpdateException bex) {
SQLException sqex = bex.getNextException();
if (sqex == null)
sqex = bex;
throw SQLExceptions.getStore(sqex, ps, _dict);
}
onerow.flush(ps, _dict, _store);
ps.addBatch();
count = 1; // reset the count to 1 for new batch
}
}
// end of the loop, execute the batch
try {
int[] rtn = ps.executeBatch();
checkUpdateCount(rtn, onerow, key);
} catch (BatchUpdateException bex) {
SQLException sqex = bex.getNextException();
if (sqex == null)
sqex = bex;
throw SQLExceptions.getStore(sqex, ps, _dict);
}
}
} catch (SQLException se) {
SQLException sqex = se.getNextException();
if (sqex == null)
sqex = se;
throw SQLExceptions.getStore(sqex, ps, _dict);
}
try {
ps.close();
} catch (SQLException sqex) {
throw SQLExceptions.getStore(sqex, ps, _dict);
}
}
// instead of calling _cacheSql.clear, null it out to improve the
// performance.
_cacheSql = null;
}
private void checkUpdateCount(int[] count, RowImpl row, String sql)
throws SQLException {
int cnt = 0;
Object failed = null;
for (int i = 0; i < count.length; i++) {
cnt = count[i];
switch (cnt) {
case Statement.EXECUTE_FAILED: // -3
failed = row.getFailedObject();
if (failed != null || row.getAction() == Row.ACTION_UPDATE)
_exceptions.add(new OptimisticException(failed));
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count[i]), sql).getMessage());
break;
case Statement.SUCCESS_NO_INFO: // -2
if (_log.isTraceEnabled())
_log.trace(_loc.get("batch_update_info",
String.valueOf(cnt), sql).getMessage());
break;
case 0: // no row is inserted, treats it as failed
// case
failed = row.getFailedObject();
if ((failed != null || row.getAction() == Row.ACTION_INSERT))
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count[i]), sql).getMessage());
}
}
}
}

View File

@ -73,6 +73,8 @@ public class DB2Dictionary
protected int maj = 0;
protected int min = 0;
private int defaultBatchLimit = 100;
public DB2Dictionary() {
platform = "DB2";
validationSQL = "SELECT DISTINCT(CURRENT TIMESTAMP) FROM "
@ -144,6 +146,8 @@ public class DB2Dictionary
"TYPE", "UNDO", "UNTIL", "VALIDPROC", "VARIABLE", "VARIANT", "VCAT",
"VOLUMES", "WHILE", "WLM", "YEARS",
}));
super.setBatchLimit(defaultBatchLimit);
}
public boolean supportsRandomAccessResultSet(Select sel,
@ -690,6 +694,20 @@ public class DB2Dictionary
return fstring;
}
/**
* Return the batch limit. If the batchLimit is -1, change it to 100 for
* best performance
*/
public int getBatchLimit() {
int limit = super.getBatchLimit();
if (limit == UNLIMITED) {
limit = defaultBatchLimit;
if (log.isTraceEnabled())
log.trace(_loc.get("batch_unlimit", String.valueOf(limit)));
}
return limit;
}
/**
* Return the correct CAST function syntax
*

View File

@ -85,6 +85,7 @@ import org.apache.openjpa.jdbc.schema.Sequence;
import org.apache.openjpa.jdbc.schema.Table;
import org.apache.openjpa.jdbc.schema.Unique;
import org.apache.openjpa.kernel.Filters;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.kernel.exps.Path;
import org.apache.openjpa.kernel.exps.Literal;
import org.apache.openjpa.lib.conf.Configurable;
@ -94,7 +95,9 @@ import org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.lib.util.Localizer.Message;
import org.apache.openjpa.meta.FieldMetaData;
import org.apache.openjpa.meta.JavaTypes;
import org.apache.openjpa.meta.ValueStrategies;
import org.apache.openjpa.util.GeneralException;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.InvalidStateException;
@ -144,6 +147,9 @@ public class DBDictionary
protected static final int NAME_TABLE = 1;
protected static final int NAME_SEQUENCE = 2;
protected static final int UNLIMITED = -1;
protected static final int NO_BATCH = 0;
private static final String ZERO_DATE_STR =
"'" + new java.sql.Date(0) + "'";
private static final String ZERO_TIME_STR = "'" + new Time(0) + "'";
@ -334,6 +340,12 @@ public class DBDictionary
private Method _setString = null;
private Method _setCharStream = null;
// batchLimit value:
// -1 = unlimited
// 0 = no batch
// any positive number = batch limit
public int batchLimit = NO_BATCH;
public DBDictionary() {
fixedSizeTypeNameSet.addAll(Arrays.asList(new String[]{
"BIGINT", "BIT", "BLOB", "CLOB", "DATE", "DECIMAL", "DISTINCT",
@ -4216,4 +4228,53 @@ public class DBDictionary
public void createIndexIfNecessary(Schema schema, String table,
Column pkColumn) {
}
/**
* Return the batchLimit
*/
public int getBatchLimit(){
return batchLimit;
}
/**
* Set the batchLimit value
*/
public void setBatchLimit(int limit){
batchLimit = limit;
}
/**
* Validate the batch process. In some cases, we can't batch the statements
* due to some restrictions. For example, if the GeneratedType=IDENTITY,
* we have to disable the batch process because we need to get the ID value
* right away for the in-memory entity to use.
*/
public boolean validateBatchProcess(RowImpl row, Column[] autoAssign,
OpenJPAStateManager sm, ClassMapping cmd ) {
boolean disableBatch = false;
if (getBatchLimit()== 0) return false;
if (autoAssign != null && sm != null) {
FieldMetaData[] fmd = cmd.getPrimaryKeyFields();
int i = 0;
while (!disableBatch && i < fmd.length) {
if (fmd[i].getValueStrategy() == ValueStrategies.AUTOASSIGN)
disableBatch = true;
i++;
}
}
// go to each Dictionary to validate the batch capability
if (!disableBatch)
disableBatch = validateDBSpecificBatchProcess(disableBatch, row,
autoAssign, sm, cmd);
return disableBatch;
}
/**
* Allow each Dictionary to validate its own batch process.
*/
public boolean validateDBSpecificBatchProcess (boolean disableBatch,
RowImpl row, Column[] autoAssign,
OpenJPAStateManager sm, ClassMapping cmd ) {
return disableBatch;
}
}

View File

@ -109,6 +109,9 @@ public class OracleDictionary
private Method _putString = null;
private Method _putChars = null;
// batch limit
private int defaultBatchLimit = 100;
public OracleDictionary() {
platform = "Oracle";
validationSQL = "SELECT SYSDATE FROM DUAL";
@ -159,6 +162,7 @@ public class OracleDictionary
}));
substringFunctionName = "SUBSTR";
super.setBatchLimit(defaultBatchLimit);
}
public void endConfiguration() {

View File

@ -110,3 +110,7 @@ no-nullable-fk: No nullable foreign key found to resolve circular flush\n\
is nullable (optional).
graph-not-cycle-free: A circular flush dependency has been found after all \
circular dependencies should have been resolved.
batch_limit: The batch limit is set to {0}.
batch_update_info: ExecuteBatch command returns update count {0} for \
statement {1}.

View File

@ -169,3 +169,4 @@ millis-query-timeout: JDBC locking does not support millisecond-granularity \
timeouts. Use timeouts that are multiples of 1000 for even second values.
db-not-supported: The database product "{0}", version "{1}" is not officially supported.
stream-exception: Unexpected error recovering the row to stream the LOB.
batch_unlimit: The batch limit was changed from unlimit (-1) to {0}.

View File

@ -3713,7 +3713,12 @@ openjpa.jdbc.UpdateManager</literal>
UpdateManager</literal>
</para>
<para>
<emphasis role="bold">Default: </emphasis><literal>default</literal>
<emphasis role="bold">Default: </emphasis><literal>constraint-batching</literal>
</para>
<para>
<emphasis role="bold">Possible values: </emphasis><literal>default</literal>,
<literal>operation-order</literal>, <literal>constraint</literal>, <literal>
batching-constraint</literal>, <literal>batching-operation-order</literal>
</para>
<para>
<emphasis role="bold">Description:</emphasis> The full class name of the
@ -3721,9 +3726,10 @@ UpdateManager</literal>
<classname>org.apache.openjpa.jdbc.kernel.UpdateManager</classname></ulink> to
use to flush persistent object changes to the datastore. The provided default
implementation is
<ulink url="../javadoc/org/apache/openjpa/jdbc/kernel/OperationOrderUpdateManager">
<classname>org.apache.openjpa.jdbc.kernel.OperationOrderUpdateManager</classname>
<ulink url="../javadoc/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager">
<classname>org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager</classname>
</ulink>.
</para>
</section>
</section>

View File

@ -3057,6 +3057,112 @@ import org.apache.openjpa.persistence.*;
Map props = new HashMap();
props.put("openjpa.ConnectionRetainMode", "always");
EntityManager em = emf.createEntityManager(props);
</programlisting>
</example>
</section>
<section id="ref_guide_dbsetup_stmtbatch">
<title>
Statement Batching
</title>
<indexterm zone="ref_guide_dbsetup_stmtbatch">
<primary>
Statement Batching
</primary>
</indexterm>
<indexterm>
<primary>
JDBC
</primary>
<secondary>
statement batching
</secondary>
<see>
statement batching
</see>
</indexterm>
<para>
In addition to connection pooling and prepared statement caching, OpenJPA
employs statement batching to speed up JDBC updates. Statement batching is
enabled by default for any JDBC driver that supports it. When batching is on,
OpenJPA automatically orders its SQL statements to maximize the size of each
batch. This can result in large performance gains for transactions that modify
a lot of data.
</para>
<para>
You configure statement batching through the system DBDictionary, which is
controlled by the openjpa.jdbc.DBDictionary configuration property. You can
enable the statement batching by setting the batchLimit in the value. The batch
limit is the maximum number of statements OpenJPA will ever batch
together. A value has the following meaning:
<itemizedlist>
<listitem>
<para>
<literal>-1</literal>: Unlimited number of statements for a batch.
</para>
</listitem>
<listitem>
<para>
<literal>0</literal>: Disable batch support. This is the default for most
dictionaries.
</para>
</listitem>
<listitem>
<para>
<literal>any positive number</literal>: Maximum number of statements for a batch.
</para>
</listitem>
</itemizedlist>
<note>
<para>
By default, the batch support is based on each Dictionary to define the default
batch limit. Currently only DB2 and Oracle dictionaries are set the default
batch limit to 100. The default batch limit for the rest of the dictionaries is set
to zero (disabled).
</para>
</note>
</para>
<para>
The example below shows how to enable and disable statement batching via
your configuration properties.
</para>
<example id="ref_guide_dbsetup_stmtbatch_exmple1">
<title>
Enable SQL statement batching
</title>
<programlisting>
&lt;property name="openjpa.jdbc.DBDictionary" value="db2(batchLimit=25)"/&gt;
&lt;property name="openjpa.jdbc.DBDictionary" value="oracle(batchLimit=-1)"/&gt;
Or
&lt;property name="openjpa.jdbc.DBDictionary" value="batchLimit=25"/&gt;
&lt;property name="openjpa.jdbc.DBDictionary" value="batchLimit=-1"/&gt;
</programlisting>
</example>
<example id="ref_guide_dbsetup_stmtbatch_exmple2">
<title>
Disable SQL statement batching
</title>
<programlisting>
&lt;property name="openjpa.jdbc.DBDictionary" value="db2(batchLimit=0)"/&gt;
Or
&lt;property name="openjpa.jdbc.DBDictionary" value="batchLimit=0"/&gt;
</programlisting>
</example>
<par>
By default, org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager
is the default statement batching implementation. You can plug-in your own
statement batching implementation by providing the implementation that extends
from AbstractUpdateManager or ConstraitUpdateManager. Add this implementation
class as a property in the persistence.xml file. For example, a custom
statement batching implementation mycomp.MyUpdateManager extends
ConstraitUpdateManager. You specify this implementation in the persistence.xml
file as the following example:
</par>
<example id="ref_guide_dbsetup_stmtbatch_exmple3">
<title>
Plug-in custom statement batching implementation
</title>
<programlisting>
&lt;property name="openjpa.jdbc.UpdateManager" value="mycomp.MyUpdateManager"/&gt;
</programlisting>
</example>
</section>