diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/conf/JDBCConfigurationImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/conf/JDBCConfigurationImpl.java index ad8909d60..9d3aa589c 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/conf/JDBCConfigurationImpl.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/conf/JDBCConfigurationImpl.java @@ -24,6 +24,7 @@ import javax.sql.DataSource; import org.apache.commons.lang.StringUtils; import org.apache.openjpa.conf.OpenJPAConfigurationImpl; +import org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager; import org.apache.openjpa.jdbc.kernel.EagerFetchModes; import org.apache.openjpa.jdbc.kernel.JDBCBrokerFactory; import org.apache.openjpa.jdbc.kernel.LRSSizes; @@ -214,11 +215,13 @@ public class JDBCConfigurationImpl updateManagerPlugin = addPlugin("jdbc.UpdateManager", true); aliases = new String[]{ "default", - "org.apache.openjpa.jdbc.kernel.ConstraintUpdateManager", + BatchingConstraintUpdateManager.class.getName(), "operation-order", "org.apache.openjpa.jdbc.kernel.OperationOrderUpdateManager", "constraint", "org.apache.openjpa.jdbc.kernel.ConstraintUpdateManager", + "batching-constraint", + BatchingConstraintUpdateManager.class.getName(), }; updateManagerPlugin.setAliases(aliases); updateManagerPlugin.setDefault(aliases[0]); diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java new file mode 100644 index 000000000..fe8e72a79 --- /dev/null +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.kernel; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.openjpa.jdbc.schema.ForeignKey; +import org.apache.openjpa.jdbc.sql.PrimaryRow; +import org.apache.openjpa.jdbc.sql.Row; +import org.apache.openjpa.jdbc.sql.RowImpl; +import org.apache.openjpa.jdbc.sql.RowManager; +import org.apache.openjpa.jdbc.sql.RowManagerImpl; +import org.apache.openjpa.jdbc.sql.SQLExceptions; +import org.apache.openjpa.kernel.OpenJPAStateManager; + +/** + *
Batch update manager that writes the SQL in object-level operation order. + * This update manager initiates a BatchPreparedStatementManagerImpl which + * will utilize the JDBC addBatch() and executeBatch() APIs to batch the + * statements for performance improvement.
+ *This is the default plug-in class for UpdateManager to support statement + * batching. You can plug-in your own statement batch implementation through + * the following property: + *
+ * < property name="openjpa.jdbc.UpdateManager" + * value="org.apache.openjpa.jdbc.kernel.YourOperationOrderUpdateManager" /> + *+ * @author Teresa Kan + */ + +public class BatchingConstraintUpdateManager extends ConstraintUpdateManager { + + protected PreparedStatementManager newPreparedStatementManager( + JDBCStore store, Connection conn) { + int batchLimit = dict.getBatchLimit(); + return new BatchingPreparedStatementManagerImpl(store, conn, batchLimit); + } +} diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java new file mode 100644 index 000000000..b98be57d7 --- /dev/null +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.jdbc.kernel; + +import java.sql.BatchUpdateException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; + +import org.apache.openjpa.jdbc.conf.JDBCConfiguration; +import org.apache.openjpa.jdbc.meta.ClassMapping; +import org.apache.openjpa.jdbc.schema.Column; +import org.apache.openjpa.jdbc.sql.Row; +import org.apache.openjpa.jdbc.sql.RowImpl; +import org.apache.openjpa.jdbc.sql.SQLExceptions; +import org.apache.openjpa.kernel.OpenJPAStateManager; +import org.apache.openjpa.lib.log.Log; +import org.apache.openjpa.lib.util.Localizer; +import org.apache.openjpa.util.ApplicationIds; +import org.apache.openjpa.util.OptimisticException; + +/** + * Batch prepared statement manager implementation. This prepared statement + * manager will utilize the JDBC addBatch() and exceuteBatch() to batch the SQL + * statements together to improve the execution performance. + * + * @author Teresa Kan + */ + +public class BatchingPreparedStatementManagerImpl extends + PreparedStatementManagerImpl { + + private final static Localizer _loc = Localizer + .forPackage(BatchingPreparedStatementManagerImpl.class); + + private Map _cacheSql = null; + private int _batchLimit; + private boolean _disableBatch = false; + private transient Log _log = null; + + /** + * Constructor. Supply connection. + */ + public BatchingPreparedStatementManagerImpl(JDBCStore store, + Connection conn, int batchLimit) { + + super(store, conn); + _batchLimit = batchLimit; + _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC); + if (_log.isTraceEnabled()) + _log.trace(_loc.get("batch_limit", String.valueOf(_batchLimit))); + } + + /** + * Flush the given row. This method will cache the statement in a cache. The + * statement will be executed in the flush() method. + */ + protected void flushInternal(RowImpl row) throws SQLException { + if (_batchLimit == 0 || _disableBatch) { + super.flushInternal(row); + return; + } + Column[] autoAssign = null; + if (row.getAction() == Row.ACTION_INSERT) + autoAssign = row.getTable().getAutoAssignedColumns(); + + // prepare statement + String sql = row.getSQL(_dict); + OpenJPAStateManager sm = row.getPrimaryKey(); + ClassMapping cmd = null; + if (sm != null) + cmd = (ClassMapping) sm.getMetaData(); + // validate batch capability + _disableBatch = _dict.validateBatchProcess(row, autoAssign, sm, cmd); + + // process the sql statement, either execute it immediately or + // cache them. + processSql(sql, row); + + // set auto assign values + if (autoAssign != null && autoAssign.length > 0 && sm != null) { + Object val; + for (int i = 0; i < autoAssign.length; i++) { + val = _dict.getGeneratedKey(autoAssign[i], _conn); + cmd.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm, + _store, autoAssign[i], val); + } + sm.setObjectId(ApplicationIds.create(sm.getPersistenceCapable(), + cmd)); + } + } + + private void processSql(String sql, RowImpl row) throws SQLException { + ArrayList temprow; + + if (_cacheSql == null) + _cacheSql = Collections.synchronizedMap(new LinkedHashMap()); + if (_disableBatch) { + // if there were some statements batched before, then + // we need to flush them out first before processing the + // current non batch process. + if (!_cacheSql.isEmpty()) + flush(); + execute(sql, row); + + } else { + // else start batch support. If the sql string is in the cache, + // just adds the row to the cache + if (_cacheSql.containsKey(sql)) { + temprow = (ArrayList) _cacheSql.get(sql); + temprow.add(row); + _cacheSql.put(sql, temprow); + } else { + // no sql exists in the cache, cache the sql string and its rows + ArrayList inputrow = new ArrayList(); + inputrow.add(row); + _cacheSql.put(sql, inputrow); + } + } // end of batch support + } + + private void execute(String sql, RowImpl row) throws SQLException { + PreparedStatement stmnt = null; + try { + ResultSet rs = null; + stmnt = _conn.prepareStatement(sql); + row.flush(stmnt, _dict, _store); + int count = stmnt.executeUpdate(); + if (count != 1) { + Object failed = row.getFailedObject(); + if (failed != null) + _exceptions.add(new OptimisticException(failed)); + else if (row.getAction() == Row.ACTION_INSERT) + throw new SQLException(_loc.get( + "update-failed-no-failed-obj", + String.valueOf(count), sql).getMessage()); + } + } catch (SQLException se) { + throw SQLExceptions.getStore(se, row.getFailedObject(), _dict); + } finally { + try { + if (stmnt != null) + stmnt.close(); + } catch (SQLException se) { + // ignore the exception for this case. + } + } + } + + public void flush() { + PreparedStatement ps = null; + ArrayList list; + RowImpl onerow = null; + + // go thru the cache to process all the sql stmt. + if (_cacheSql == null || _cacheSql.isEmpty()) { + super.flush(); + return; + } + Set e = _cacheSql.keySet(); + + for (Iterator itr = e.iterator(); itr.hasNext();) { + String key = (String) itr.next(); + try { + ps = _conn.prepareStatement(key); + } catch (SQLException se) { + throw SQLExceptions.getStore(se, ps, _dict); + } + list = (ArrayList) _cacheSql.get(key); + if (list == null) { + return; + } + + // if only 1 row for this statement, then execute it right away + int rowsize = list.size(); + + try { + if (rowsize == 1) { + onerow = (RowImpl) list.get(0); + onerow.flush(ps, _dict, _store); + int count = ps.executeUpdate(); + if (count != 1) { + Object failed = onerow.getFailedObject(); + if (failed != null) + _exceptions.add(new OptimisticException(failed)); + else if (onerow.getAction() == Row.ACTION_INSERT) + throw new SQLException(_loc.get( + "update-failed-no-failed-obj", + String.valueOf(count), key).getMessage()); + } + } else { + // has more than one rows for this statement, use addBatch + int count = 0; + for (int i = 0; i < list.size(); i++) { + onerow = (RowImpl) list.get(i); + if (count < _batchLimit || _batchLimit == -1) { + onerow.flush(ps, _dict, _store); + ps.addBatch(); + count++; + + } else { + // reach the batchLimit , execute it + try { + int[] rtn = ps.executeBatch(); + checkUpdateCount(rtn, onerow, key); + } catch (BatchUpdateException bex) { + SQLException sqex = bex.getNextException(); + if (sqex == null) + sqex = bex; + throw SQLExceptions.getStore(sqex, ps, _dict); + } + onerow.flush(ps, _dict, _store); + ps.addBatch(); + count = 1; // reset the count to 1 for new batch + } + } + // end of the loop, execute the batch + try { + int[] rtn = ps.executeBatch(); + checkUpdateCount(rtn, onerow, key); + } catch (BatchUpdateException bex) { + SQLException sqex = bex.getNextException(); + if (sqex == null) + sqex = bex; + throw SQLExceptions.getStore(sqex, ps, _dict); + } + } + } catch (SQLException se) { + SQLException sqex = se.getNextException(); + if (sqex == null) + sqex = se; + throw SQLExceptions.getStore(sqex, ps, _dict); + } + try { + ps.close(); + } catch (SQLException sqex) { + throw SQLExceptions.getStore(sqex, ps, _dict); + } + } + // instead of calling _cacheSql.clear, null it out to improve the + // performance. + _cacheSql = null; + } + + private void checkUpdateCount(int[] count, RowImpl row, String sql) + throws SQLException { + int cnt = 0; + Object failed = null; + for (int i = 0; i < count.length; i++) { + cnt = count[i]; + switch (cnt) { + case Statement.EXECUTE_FAILED: // -3 + failed = row.getFailedObject(); + if (failed != null || row.getAction() == Row.ACTION_UPDATE) + _exceptions.add(new OptimisticException(failed)); + else if (row.getAction() == Row.ACTION_INSERT) + throw new SQLException(_loc.get( + "update-failed-no-failed-obj", + String.valueOf(count[i]), sql).getMessage()); + break; + case Statement.SUCCESS_NO_INFO: // -2 + if (_log.isTraceEnabled()) + _log.trace(_loc.get("batch_update_info", + String.valueOf(cnt), sql).getMessage()); + break; + case 0: // no row is inserted, treats it as failed + // case + failed = row.getFailedObject(); + if ((failed != null || row.getAction() == Row.ACTION_INSERT)) + throw new SQLException(_loc.get( + "update-failed-no-failed-obj", + String.valueOf(count[i]), sql).getMessage()); + } + } + } +} diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DB2Dictionary.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DB2Dictionary.java index 9998667bd..1d6a40a3f 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DB2Dictionary.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DB2Dictionary.java @@ -72,6 +72,8 @@ public class DB2Dictionary protected String databaseProductVersion = null; protected int maj = 0; protected int min = 0; + + private int defaultBatchLimit = 100; public DB2Dictionary() { platform = "DB2"; @@ -144,6 +146,8 @@ public class DB2Dictionary "TYPE", "UNDO", "UNTIL", "VALIDPROC", "VARIABLE", "VARIANT", "VCAT", "VOLUMES", "WHILE", "WLM", "YEARS", })); + + super.setBatchLimit(defaultBatchLimit); } public boolean supportsRandomAccessResultSet(Select sel, @@ -690,6 +694,20 @@ public class DB2Dictionary return fstring; } + /** + * Return the batch limit. If the batchLimit is -1, change it to 100 for + * best performance + */ + public int getBatchLimit() { + int limit = super.getBatchLimit(); + if (limit == UNLIMITED) { + limit = defaultBatchLimit; + if (log.isTraceEnabled()) + log.trace(_loc.get("batch_unlimit", String.valueOf(limit))); + } + return limit; + } + /** * Return the correct CAST function syntax * diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java index 04ba143bb..884ea93ac 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/DBDictionary.java @@ -85,6 +85,7 @@ import org.apache.openjpa.jdbc.schema.Sequence; import org.apache.openjpa.jdbc.schema.Table; import org.apache.openjpa.jdbc.schema.Unique; import org.apache.openjpa.kernel.Filters; +import org.apache.openjpa.kernel.OpenJPAStateManager; import org.apache.openjpa.kernel.exps.Path; import org.apache.openjpa.kernel.exps.Literal; import org.apache.openjpa.lib.conf.Configurable; @@ -94,7 +95,9 @@ import org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator; import org.apache.openjpa.lib.log.Log; import org.apache.openjpa.lib.util.Localizer; import org.apache.openjpa.lib.util.Localizer.Message; +import org.apache.openjpa.meta.FieldMetaData; import org.apache.openjpa.meta.JavaTypes; +import org.apache.openjpa.meta.ValueStrategies; import org.apache.openjpa.util.GeneralException; import org.apache.openjpa.util.InternalException; import org.apache.openjpa.util.InvalidStateException; @@ -143,6 +146,9 @@ public class DBDictionary protected static final int NAME_ANY = 0; protected static final int NAME_TABLE = 1; protected static final int NAME_SEQUENCE = 2; + + protected static final int UNLIMITED = -1; + protected static final int NO_BATCH = 0; private static final String ZERO_DATE_STR = "'" + new java.sql.Date(0) + "'"; @@ -334,6 +340,12 @@ public class DBDictionary private Method _setString = null; private Method _setCharStream = null; + // batchLimit value: + // -1 = unlimited + // 0 = no batch + // any positive number = batch limit + public int batchLimit = NO_BATCH; + public DBDictionary() { fixedSizeTypeNameSet.addAll(Arrays.asList(new String[]{ "BIGINT", "BIT", "BLOB", "CLOB", "DATE", "DECIMAL", "DISTINCT", @@ -4216,4 +4228,53 @@ public class DBDictionary public void createIndexIfNecessary(Schema schema, String table, Column pkColumn) { } + + /** + * Return the batchLimit + */ + public int getBatchLimit(){ + return batchLimit; + } + + /** + * Set the batchLimit value + */ + public void setBatchLimit(int limit){ + batchLimit = limit; + } + + /** + * Validate the batch process. In some cases, we can't batch the statements + * due to some restrictions. For example, if the GeneratedType=IDENTITY, + * we have to disable the batch process because we need to get the ID value + * right away for the in-memory entity to use. + */ + public boolean validateBatchProcess(RowImpl row, Column[] autoAssign, + OpenJPAStateManager sm, ClassMapping cmd ) { + boolean disableBatch = false; + if (getBatchLimit()== 0) return false; + if (autoAssign != null && sm != null) { + FieldMetaData[] fmd = cmd.getPrimaryKeyFields(); + int i = 0; + while (!disableBatch && i < fmd.length) { + if (fmd[i].getValueStrategy() == ValueStrategies.AUTOASSIGN) + disableBatch = true; + i++; + } + } + // go to each Dictionary to validate the batch capability + if (!disableBatch) + disableBatch = validateDBSpecificBatchProcess(disableBatch, row, + autoAssign, sm, cmd); + return disableBatch; + } + + /** + * Allow each Dictionary to validate its own batch process. + */ + public boolean validateDBSpecificBatchProcess (boolean disableBatch, + RowImpl row, Column[] autoAssign, + OpenJPAStateManager sm, ClassMapping cmd ) { + return disableBatch; + } } diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java index d99a374b3..e9819c194 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/sql/OracleDictionary.java @@ -108,6 +108,9 @@ public class OracleDictionary private Method _putBytes = null; private Method _putString = null; private Method _putChars = null; + + // batch limit + private int defaultBatchLimit = 100; public OracleDictionary() { platform = "Oracle"; @@ -159,6 +162,7 @@ public class OracleDictionary })); substringFunctionName = "SUBSTR"; + super.setBatchLimit(defaultBatchLimit); } public void endConfiguration() { diff --git a/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/kernel/localizer.properties b/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/kernel/localizer.properties index dd0f87b75..b259060bd 100644 --- a/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/kernel/localizer.properties +++ b/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/kernel/localizer.properties @@ -110,3 +110,7 @@ no-nullable-fk: No nullable foreign key found to resolve circular flush\n\ is nullable (optional). graph-not-cycle-free: A circular flush dependency has been found after all \ circular dependencies should have been resolved. +batch_limit: The batch limit is set to {0}. +batch_update_info: ExecuteBatch command returns update count {0} for \ + statement {1}. + diff --git a/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/sql/localizer.properties b/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/sql/localizer.properties index 75341677d..b3146a489 100644 --- a/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/sql/localizer.properties +++ b/openjpa-jdbc/src/main/resources/org/apache/openjpa/jdbc/sql/localizer.properties @@ -169,3 +169,4 @@ millis-query-timeout: JDBC locking does not support millisecond-granularity \ timeouts. Use timeouts that are multiples of 1000 for even second values. db-not-supported: The database product "{0}", version "{1}" is not officially supported. stream-exception: Unexpected error recovering the row to stream the LOB. +batch_unlimit: The batch limit was changed from unlimit (-1) to {0}. \ No newline at end of file diff --git a/openjpa-project/src/doc/manual/ref_guide_conf.xml b/openjpa-project/src/doc/manual/ref_guide_conf.xml index 753cb0449..b8ef9e12a 100644 --- a/openjpa-project/src/doc/manual/ref_guide_conf.xml +++ b/openjpa-project/src/doc/manual/ref_guide_conf.xml @@ -3713,7 +3713,12 @@ openjpa.jdbc.UpdateManager UpdateManager