diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java index 56935a11e..53fb46c58 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java @@ -83,38 +83,41 @@ public class BatchingPreparedStatementManagerImpl extends } else { // process the SQL statement, either execute it immediately or // batch it for later execution. - String sql = row.getSQL(_dict); - if (_batchedSql == null) { - // brand new SQL - _batchedSql = sql; - } else if (!sql.equals(_batchedSql)) { - // SQL statements changed. - switch (_batchedRows.size()) { - case 0: - break; - case 1: - // single entry in cache, direct SQL execution. - super.flushAndUpdate((RowImpl) _batchedRows.get(0)); - _batchedRows.clear(); - break; - default: - // flush all entries in cache in batch. - flushBatch(); - } - _batchedSql = sql; - } - _batchedRows.add(row); + batchOrExecuteRow(row); } } + protected void batchOrExecuteRow(RowImpl row) throws SQLException { + String sql = row.getSQL(_dict); + if (_batchedSql == null) { + // brand new SQL + _batchedSql = sql; + } else if (!sql.equals(_batchedSql)) { + // SQL statements changed. + switch (_batchedRows.size()) { + case 0: + break; + case 1: + // single entry in cache, direct SQL execution. + super.flushAndUpdate((RowImpl) _batchedRows.get(0)); + _batchedRows.clear(); + break; + default: + // flush all entries in cache in batch. + flushBatch(); + } + _batchedSql = sql; + } + _batchedRows.add(row); + } + /* * Compute if batching is disabled, based on values of batch limit * and database characteristics. */ private boolean isBatchDisabled(RowImpl row) { boolean rtnVal = true; - if (_batchLimit != 0 && !_disableBatch) { - String sql = row.getSQL(_dict); + if (getBatchLimit() != 0 && !isBatchDisabled()) { OpenJPAStateManager sm = row.getPrimaryKey(); ClassMapping cmd = null; if (sm != null) @@ -123,9 +126,9 @@ public class BatchingPreparedStatementManagerImpl extends if (row.getAction() == Row.ACTION_INSERT) autoAssign = row.getTable().getAutoAssignedColumns(); // validate batch capability - _disableBatch = _dict + rtnVal = _dict .validateBatchProcess(row, autoAssign, sm, cmd); - rtnVal = _disableBatch; + setBatchDisabled(rtnVal); } return rtnVal; } @@ -135,45 +138,53 @@ public class BatchingPreparedStatementManagerImpl extends * prepared statements. */ protected void flushBatch() { - if (_batchedSql != null && _batchedRows.size() > 0) { + List batchedRows = getBatchedRows(); + String batchedSql = getBatchedSql(); + if (batchedRows == null) + return; + + int batchSize = batchedRows.size(); + if (batchedSql != null && batchSize > 0) { PreparedStatement ps = null; try { RowImpl onerow = null; - ps = _conn.prepareStatement(_batchedSql); - if (_batchedRows.size() == 1) { + ps = prepareStatement(batchedSql); + if (batchSize == 1) { // execute a single row. - onerow = (RowImpl) _batchedRows.get(0); + onerow = (RowImpl) batchedRows.get(0); flushSingleRow(onerow, ps); } else { // cache has more than one rows, execute as batch. int count = 0; int batchedRowsBaseIndex = 0; - Iterator itr = _batchedRows.iterator(); + Iterator itr = batchedRows.iterator(); while (itr.hasNext()) { onerow = (RowImpl) itr.next(); if (_batchLimit == 1) { flushSingleRow(onerow, ps); } else { if (count < _batchLimit || _batchLimit == -1) { - onerow.flush(ps, _dict, _store); - ps.addBatch(); + if (ps != null) + onerow.flush(ps, _dict, _store); + addBatch(ps, onerow, count); count++; } else { // reach the batchLimit, execute the batch - int[] rtn = ps.executeBatch(); + int[] rtn = executeBatch(ps); checkUpdateCount(rtn, batchedRowsBaseIndex); batchedRowsBaseIndex += _batchLimit; - onerow.flush(ps, _dict, _store); - ps.addBatch(); + if (ps != null) + onerow.flush(ps, _dict, _store); + addBatch(ps, onerow, count); // reset the count to 1 for new batch count = 1; } } } // end of the loop, execute the batch - int[] rtn = ps.executeBatch(); + int[] rtn = executeBatch(ps); checkUpdateCount(rtn, batchedRowsBaseIndex); } } catch (SQLException se) { @@ -183,7 +194,7 @@ public class BatchingPreparedStatementManagerImpl extends throw SQLExceptions.getStore(sqex, ps, _dict); } finally { _batchedSql = null; - _batchedRows.clear(); + batchedRows.clear(); if (ps != null) { try { ps.close(); @@ -200,8 +211,9 @@ public class BatchingPreparedStatementManagerImpl extends */ private void flushSingleRow(RowImpl row, PreparedStatement ps) throws SQLException { - row.flush(ps, _dict, _store); - int count = ps.executeUpdate(); + if (ps != null) + row.flush(ps, _dict, _store); + int count = executeUpdate(ps, row.getSQL(_dict), row); if (count != 1) { Object failed = row.getFailedObject(); if (failed != null) @@ -219,9 +231,10 @@ public class BatchingPreparedStatementManagerImpl extends throws SQLException { int cnt = 0; Object failed = null; + List batchedRows = getBatchedRows(); for (int i = 0; i < count.length; i++) { cnt = count[i]; - RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i); + RowImpl row = (RowImpl) batchedRows.get(batchedRowsBaseIndex + i); failed = row.getFailedObject(); switch (cnt) { case Statement.EXECUTE_FAILED: // -3 @@ -230,14 +243,16 @@ public class BatchingPreparedStatementManagerImpl extends else if (row.getAction() == Row.ACTION_INSERT) throw new SQLException(_loc.get( "update-failed-no-failed-obj", - String.valueOf(count[i]), _batchedSql).getMessage()); + String.valueOf(count[i]), + row.getSQL(_dict)).getMessage()); break; case Statement.SUCCESS_NO_INFO: // -2 if (failed != null || row.getAction() == Row.ACTION_UPDATE) _exceptions.add(new OptimisticException(failed)); else if (_log.isTraceEnabled()) _log.trace(_loc.get("batch_update_info", - String.valueOf(cnt), _batchedSql).getMessage()); + String.valueOf(cnt), + row.getSQL(_dict)).getMessage()); break; case 0: // no row is inserted, treats it as failed // case @@ -246,8 +261,43 @@ public class BatchingPreparedStatementManagerImpl extends else if (row.getAction() == Row.ACTION_INSERT) throw new SQLException(_loc.get( "update-failed-no-failed-obj", - String.valueOf(count[i]), _batchedSql).getMessage()); + String.valueOf(count[i]), + row.getSQL(_dict)).getMessage()); } } } + + public boolean isBatchDisabled() { + return _disableBatch; + } + + public void setBatchDisabled(boolean disableBatch) { + _disableBatch = disableBatch; + } + + public int getBatchLimit() { + return _batchLimit; + } + + public void setBatchLimit(int batchLimit) { + _batchLimit = batchLimit; + } + + public List getBatchedRows() { + return _batchedRows; + } + + public String getBatchedSql() { + return _batchedSql; + } + + protected void addBatch(PreparedStatement ps, RowImpl row, + int count) throws SQLException { + ps.addBatch(); + } + + protected int[] executeBatch(PreparedStatement ps) + throws SQLException { + return ps.executeBatch(); + } }