diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java index fe8e72a79..35a2a25d6 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java @@ -19,19 +19,9 @@ package org.apache.openjpa.jdbc.kernel; import java.sql.Connection; -import java.sql.SQLException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; -import org.apache.openjpa.jdbc.schema.ForeignKey; -import org.apache.openjpa.jdbc.sql.PrimaryRow; -import org.apache.openjpa.jdbc.sql.Row; -import org.apache.openjpa.jdbc.sql.RowImpl; import org.apache.openjpa.jdbc.sql.RowManager; -import org.apache.openjpa.jdbc.sql.RowManagerImpl; -import org.apache.openjpa.jdbc.sql.SQLExceptions; -import org.apache.openjpa.kernel.OpenJPAStateManager; /** *
Batch update manager that writes the SQL in object-level operation order. @@ -51,8 +41,22 @@ import org.apache.openjpa.kernel.OpenJPAStateManager; public class BatchingConstraintUpdateManager extends ConstraintUpdateManager { protected PreparedStatementManager newPreparedStatementManager( - JDBCStore store, Connection conn) { + JDBCStore store, Connection conn) { int batchLimit = dict.getBatchLimit(); - return new BatchingPreparedStatementManagerImpl(store, conn, batchLimit); + return new BatchingPreparedStatementManagerImpl(store, conn, + batchLimit); + } + + /* + * Override this method to flush any remaining batched row in the + * PreparedStatementManager. + */ + protected Collection flush(RowManager rowMgr, + PreparedStatementManager psMgr, Collection exceps) { + Collection rtnCol = super.flush(rowMgr, psMgr, exceps); + BatchingPreparedStatementManagerImpl bPsMgr = + (BatchingPreparedStatementManagerImpl) psMgr; + bPsMgr.flushBatch(); + return rtnCol; } } diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java index 350578413..24fcadf9b 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java @@ -19,6 +19,9 @@ package org.apache.openjpa.jdbc.kernel; import java.sql.Connection; +import java.util.Collection; + +import org.apache.openjpa.jdbc.sql.RowManager; /** *
Batch update manager that writes the SQL in object-level operation order. @@ -37,12 +40,25 @@ import java.sql.Connection; */ public class BatchingOperationOrderUpdateManager extends - OperationOrderUpdateManager { + OperationOrderUpdateManager { protected PreparedStatementManager newPreparedStatementManager( - JDBCStore store, Connection conn) { + JDBCStore store, Connection conn) { int batchLimit = dict.getBatchLimit(); - return new BatchingPreparedStatementManagerImpl( - store, conn, batchLimit); + return new BatchingPreparedStatementManagerImpl(store, conn, + batchLimit); + } + + /* + * Override this method to flush any remaining batched row in the + * PreparedStatementManager. + */ + protected Collection flush(RowManager rowMgr, + PreparedStatementManager psMgr, Collection exceps) { + Collection rtnCol = super.flush(rowMgr, psMgr, exceps); + BatchingPreparedStatementManagerImpl bPsMgr = + (BatchingPreparedStatementManagerImpl) psMgr; + bPsMgr.flushBatch(); + return rtnCol; } } diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java index b98be57d7..f25cdff8f 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java @@ -18,20 +18,13 @@ */ package org.apache.openjpa.jdbc.kernel; -import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; +import java.util.List; import org.apache.openjpa.jdbc.conf.JDBCConfiguration; import org.apache.openjpa.jdbc.meta.ClassMapping; @@ -42,7 +35,6 @@ import org.apache.openjpa.jdbc.sql.SQLExceptions; import org.apache.openjpa.kernel.OpenJPAStateManager; import org.apache.openjpa.lib.log.Log; import org.apache.openjpa.lib.util.Localizer; -import org.apache.openjpa.util.ApplicationIds; import org.apache.openjpa.util.OptimisticException; /** @@ -59,7 +51,8 @@ public class BatchingPreparedStatementManagerImpl extends private final static Localizer _loc = Localizer .forPackage(BatchingPreparedStatementManagerImpl.class); - private Map _cacheSql = null; + private String _batchedSql = null; + private List _batchedRows = new ArrayList(); private int _batchLimit; private boolean _disableBatch = false; private transient Log _log = null; @@ -68,8 +61,7 @@ public class BatchingPreparedStatementManagerImpl extends * Constructor. Supply connection. */ public BatchingPreparedStatementManagerImpl(JDBCStore store, - Connection conn, int batchLimit) { - + Connection conn, int batchLimit) { super(store, conn); _batchLimit = batchLimit; _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC); @@ -78,202 +70,158 @@ public class BatchingPreparedStatementManagerImpl extends } /** - * Flush the given row. This method will cache the statement in a cache. The - * statement will be executed in the flush() method. + * Flush the given row immediately or deferred the flush in batch. */ - protected void flushInternal(RowImpl row) throws SQLException { - if (_batchLimit == 0 || _disableBatch) { - super.flushInternal(row); - return; - } - Column[] autoAssign = null; - if (row.getAction() == Row.ACTION_INSERT) - autoAssign = row.getTable().getAutoAssignedColumns(); - - // prepare statement - String sql = row.getSQL(_dict); - OpenJPAStateManager sm = row.getPrimaryKey(); - ClassMapping cmd = null; - if (sm != null) - cmd = (ClassMapping) sm.getMetaData(); - // validate batch capability - _disableBatch = _dict.validateBatchProcess(row, autoAssign, sm, cmd); - - // process the sql statement, either execute it immediately or - // cache them. - processSql(sql, row); - - // set auto assign values - if (autoAssign != null && autoAssign.length > 0 && sm != null) { - Object val; - for (int i = 0; i < autoAssign.length; i++) { - val = _dict.getGeneratedKey(autoAssign[i], _conn); - cmd.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm, - _store, autoAssign[i], val); - } - sm.setObjectId(ApplicationIds.create(sm.getPersistenceCapable(), - cmd)); - } - } - - private void processSql(String sql, RowImpl row) throws SQLException { - ArrayList temprow; - - if (_cacheSql == null) - _cacheSql = Collections.synchronizedMap(new LinkedHashMap()); - if (_disableBatch) { + protected void flushAndUpdate(RowImpl row) throws SQLException { + if (isBatchDisabled(row)) { // if there were some statements batched before, then // we need to flush them out first before processing the // current non batch process. - if (!_cacheSql.isEmpty()) - flush(); - execute(sql, row); + flushBatch(); + super.flushAndUpdate(row); } else { - // else start batch support. If the sql string is in the cache, - // just adds the row to the cache - if (_cacheSql.containsKey(sql)) { - temprow = (ArrayList) _cacheSql.get(sql); - temprow.add(row); - _cacheSql.put(sql, temprow); - } else { - // no sql exists in the cache, cache the sql string and its rows - ArrayList inputrow = new ArrayList(); - inputrow.add(row); - _cacheSql.put(sql, inputrow); - } - } // end of batch support - } - - private void execute(String sql, RowImpl row) throws SQLException { - PreparedStatement stmnt = null; - try { - ResultSet rs = null; - stmnt = _conn.prepareStatement(sql); - row.flush(stmnt, _dict, _store); - int count = stmnt.executeUpdate(); - if (count != 1) { - Object failed = row.getFailedObject(); - if (failed != null) - _exceptions.add(new OptimisticException(failed)); - else if (row.getAction() == Row.ACTION_INSERT) - throw new SQLException(_loc.get( - "update-failed-no-failed-obj", - String.valueOf(count), sql).getMessage()); - } - } catch (SQLException se) { - throw SQLExceptions.getStore(se, row.getFailedObject(), _dict); - } finally { - try { - if (stmnt != null) - stmnt.close(); - } catch (SQLException se) { - // ignore the exception for this case. + // process the SQL statement, either execute it immediately or + // batch it for later execution. + String sql = row.getSQL(_dict); + if (_batchedSql == null) { + // brand new SQL + _batchedSql = sql; + } else if (!sql.equals(_batchedSql)) { + // SQL statements changed. + switch (_batchedRows.size()) { + case 0: + break; + case 1: + // single entry in cache, direct SQL execution. + super.flushAndUpdate((RowImpl) _batchedRows.get(0)); + _batchedRows.clear(); + break; + default: + // flush all entries in cache in batch. + flushBatch(); + } + _batchedSql = sql; } + _batchedRows.add(row); } } - public void flush() { - PreparedStatement ps = null; - ArrayList list; - RowImpl onerow = null; - - // go thru the cache to process all the sql stmt. - if (_cacheSql == null || _cacheSql.isEmpty()) { - super.flush(); - return; + /* + * Compute if batching is disabled, based on values of batch limit + * and database characteristics. + */ + private boolean isBatchDisabled(RowImpl row) { + boolean rtnVal = true; + if (_batchLimit != 0 && !_disableBatch) { + String sql = row.getSQL(_dict); + OpenJPAStateManager sm = row.getPrimaryKey(); + ClassMapping cmd = null; + if (sm != null) + cmd = (ClassMapping) sm.getMetaData(); + Column[] autoAssign = null; + if (row.getAction() == Row.ACTION_INSERT) + autoAssign = row.getTable().getAutoAssignedColumns(); + // validate batch capability + _disableBatch = _dict + .validateBatchProcess(row, autoAssign, sm, cmd); + rtnVal = _disableBatch; } - Set e = _cacheSql.keySet(); - - for (Iterator itr = e.iterator(); itr.hasNext();) { - String key = (String) itr.next(); + return rtnVal; + } + + /** + * flush all cached up statements to be executed as a single or batched + * prepared statements. + */ + protected void flushBatch() { + if (_batchedSql != null && _batchedRows.size() > 0) { + PreparedStatement ps = null; try { - ps = _conn.prepareStatement(key); - } catch (SQLException se) { - throw SQLExceptions.getStore(se, ps, _dict); - } - list = (ArrayList) _cacheSql.get(key); - if (list == null) { - return; - } - - // if only 1 row for this statement, then execute it right away - int rowsize = list.size(); - - try { - if (rowsize == 1) { - onerow = (RowImpl) list.get(0); - onerow.flush(ps, _dict, _store); - int count = ps.executeUpdate(); - if (count != 1) { - Object failed = onerow.getFailedObject(); - if (failed != null) - _exceptions.add(new OptimisticException(failed)); - else if (onerow.getAction() == Row.ACTION_INSERT) - throw new SQLException(_loc.get( - "update-failed-no-failed-obj", - String.valueOf(count), key).getMessage()); - } + RowImpl onerow = null; + ps = _conn.prepareStatement(_batchedSql); + if (_batchedRows.size() == 1) { + // execute a single row. + onerow = (RowImpl) _batchedRows.get(0); + flushSingleRow(onerow, ps); } else { - // has more than one rows for this statement, use addBatch + // cache has more than one rows, execute as batch. int count = 0; - for (int i = 0; i < list.size(); i++) { - onerow = (RowImpl) list.get(i); - if (count < _batchLimit || _batchLimit == -1) { - onerow.flush(ps, _dict, _store); - ps.addBatch(); - count++; - + int batchedRowsBaseIndex = 0; + Iterator itr = _batchedRows.iterator(); + while (itr.hasNext()) { + onerow = (RowImpl) itr.next(); + if (_batchLimit == 1) { + flushSingleRow(onerow, ps); } else { - // reach the batchLimit , execute it - try { + if (count < _batchLimit || _batchLimit == -1) { + onerow.flush(ps, _dict, _store); + ps.addBatch(); + count++; + } else { + // reach the batchLimit, execute the batch int[] rtn = ps.executeBatch(); - checkUpdateCount(rtn, onerow, key); - } catch (BatchUpdateException bex) { - SQLException sqex = bex.getNextException(); - if (sqex == null) - sqex = bex; - throw SQLExceptions.getStore(sqex, ps, _dict); + checkUpdateCount(rtn, batchedRowsBaseIndex); + + batchedRowsBaseIndex += _batchLimit; + + onerow.flush(ps, _dict, _store); + ps.addBatch(); + // reset the count to 1 for new batch + count = 1; } - onerow.flush(ps, _dict, _store); - ps.addBatch(); - count = 1; // reset the count to 1 for new batch } } // end of the loop, execute the batch - try { - int[] rtn = ps.executeBatch(); - checkUpdateCount(rtn, onerow, key); - } catch (BatchUpdateException bex) { - SQLException sqex = bex.getNextException(); - if (sqex == null) - sqex = bex; - throw SQLExceptions.getStore(sqex, ps, _dict); - } + int[] rtn = ps.executeBatch(); + checkUpdateCount(rtn, batchedRowsBaseIndex); } } catch (SQLException se) { SQLException sqex = se.getNextException(); if (sqex == null) sqex = se; throw SQLExceptions.getStore(sqex, ps, _dict); - } - try { - ps.close(); - } catch (SQLException sqex) { - throw SQLExceptions.getStore(sqex, ps, _dict); + } finally { + _batchedSql = null; + _batchedRows.clear(); + if (ps != null) { + try { + ps.close(); + } catch (SQLException sqex) { + throw SQLExceptions.getStore(sqex, ps, _dict); + } + } } } - // instead of calling _cacheSql.clear, null it out to improve the - // performance. - _cacheSql = null; } - private void checkUpdateCount(int[] count, RowImpl row, String sql) - throws SQLException { + /* + * Execute an update of a single row. + */ + private void flushSingleRow(RowImpl row, PreparedStatement ps) + throws SQLException { + row.flush(ps, _dict, _store); + int count = ps.executeUpdate(); + if (count != 1) { + Object failed = row.getFailedObject(); + if (failed != null) + _exceptions.add(new OptimisticException(failed)); + else if (row.getAction() == Row.ACTION_INSERT) + throw new SQLException(_loc.get("update-failed-no-failed-obj", + String.valueOf(count), row.getSQL(_dict)).getMessage()); + } + } + + /* + * Process executeBatch function array of return counts. + */ + private void checkUpdateCount(int[] count, int batchedRowsBaseIndex) + throws SQLException { int cnt = 0; Object failed = null; for (int i = 0; i < count.length; i++) { cnt = count[i]; + RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i); switch (cnt) { case Statement.EXECUTE_FAILED: // -3 failed = row.getFailedObject(); @@ -281,21 +229,21 @@ public class BatchingPreparedStatementManagerImpl extends _exceptions.add(new OptimisticException(failed)); else if (row.getAction() == Row.ACTION_INSERT) throw new SQLException(_loc.get( - "update-failed-no-failed-obj", - String.valueOf(count[i]), sql).getMessage()); + "update-failed-no-failed-obj", + String.valueOf(count[i]), _batchedSql).getMessage()); break; case Statement.SUCCESS_NO_INFO: // -2 if (_log.isTraceEnabled()) _log.trace(_loc.get("batch_update_info", - String.valueOf(cnt), sql).getMessage()); + String.valueOf(cnt), _batchedSql).getMessage()); break; case 0: // no row is inserted, treats it as failed // case failed = row.getFailedObject(); if ((failed != null || row.getAction() == Row.ACTION_INSERT)) throw new SQLException(_loc.get( - "update-failed-no-failed-obj", - String.valueOf(count[i]), sql).getMessage()); + "update-failed-no-failed-obj", + String.valueOf(count[i]), _batchedSql).getMessage()); } } } diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java index 02c77e728..87908437f 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java @@ -86,10 +86,33 @@ public class PreparedStatementManagerImpl if (row.getAction() == Row.ACTION_INSERT) autoAssign = row.getTable().getAutoAssignedColumns(); + flushAndUpdate(row); + + // set auto assign values + if (autoAssign != null && autoAssign.length > 0 + && row.getPrimaryKey() != null) { + OpenJPAStateManager sm = row.getPrimaryKey(); + ClassMapping mapping = (ClassMapping) sm.getMetaData(); + Object val; + for (int i = 0; i < autoAssign.length; i++) { + val = _dict.getGeneratedKey(autoAssign[i], _conn); + mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm, + _store, autoAssign[i], val); + } + sm.setObjectId( + ApplicationIds.create(sm.getPersistenceCapable(), mapping)); + } + } + + /** + * Flush the given row immediately. + */ + protected void flushAndUpdate(RowImpl row) + throws SQLException { // prepare statement String sql = row.getSQL(_dict); PreparedStatement stmnt = prepareStatement(sql); - + // setup parameters and execute statement if (stmnt != null) row.flush(stmnt, _dict, _store); @@ -107,23 +130,12 @@ public class PreparedStatementManagerImpl } catch (SQLException se) { throw SQLExceptions.getStore(se, row.getFailedObject(), _dict); } finally { - if (stmnt != null) - try { stmnt.close(); } catch (SQLException se) {} - } - - // set auto assign values - if (autoAssign != null && autoAssign.length > 0 - && row.getPrimaryKey() != null) { - OpenJPAStateManager sm = row.getPrimaryKey(); - ClassMapping mapping = (ClassMapping) sm.getMetaData(); - Object val; - for (int i = 0; i < autoAssign.length; i++) { - val = _dict.getGeneratedKey(autoAssign[i], _conn); - mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm, - _store, autoAssign[i], val); + if (stmnt != null) { + try { + stmnt.close(); + } catch (SQLException se) { + } } - sm.setObjectId( - ApplicationIds.create(sm.getPersistenceCapable(), mapping)); } } @@ -146,5 +158,5 @@ public class PreparedStatementManagerImpl protected PreparedStatement prepareStatement(String sql) throws SQLException { return _conn.prepareStatement(sql); - } + } }