OPENJPA-530 - Change BatchingPreparedStatementManagerImpl to correctly batch dispatched statements in the same order requested by the update managers.

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@633317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Albert Lee 2008-03-03 22:59:07 +00:00
parent c3ad06e8ff
commit 64402ea7da
4 changed files with 193 additions and 213 deletions

View File

@ -19,19 +19,9 @@
package org.apache.openjpa.jdbc.kernel; package org.apache.openjpa.jdbc.kernel;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import org.apache.openjpa.jdbc.schema.ForeignKey;
import org.apache.openjpa.jdbc.sql.PrimaryRow;
import org.apache.openjpa.jdbc.sql.Row;
import org.apache.openjpa.jdbc.sql.RowImpl;
import org.apache.openjpa.jdbc.sql.RowManager; import org.apache.openjpa.jdbc.sql.RowManager;
import org.apache.openjpa.jdbc.sql.RowManagerImpl;
import org.apache.openjpa.jdbc.sql.SQLExceptions;
import org.apache.openjpa.kernel.OpenJPAStateManager;
/** /**
* <P>Batch update manager that writes the SQL in object-level operation order. * <P>Batch update manager that writes the SQL in object-level operation order.
@ -51,8 +41,22 @@ import org.apache.openjpa.kernel.OpenJPAStateManager;
public class BatchingConstraintUpdateManager extends ConstraintUpdateManager { public class BatchingConstraintUpdateManager extends ConstraintUpdateManager {
protected PreparedStatementManager newPreparedStatementManager( protected PreparedStatementManager newPreparedStatementManager(
JDBCStore store, Connection conn) { JDBCStore store, Connection conn) {
int batchLimit = dict.getBatchLimit(); int batchLimit = dict.getBatchLimit();
return new BatchingPreparedStatementManagerImpl(store, conn, batchLimit); return new BatchingPreparedStatementManagerImpl(store, conn,
batchLimit);
}
/*
* Override this method to flush any remaining batched row in the
* PreparedStatementManager.
*/
protected Collection flush(RowManager rowMgr,
PreparedStatementManager psMgr, Collection exceps) {
Collection rtnCol = super.flush(rowMgr, psMgr, exceps);
BatchingPreparedStatementManagerImpl bPsMgr =
(BatchingPreparedStatementManagerImpl) psMgr;
bPsMgr.flushBatch();
return rtnCol;
} }
} }

View File

@ -19,6 +19,9 @@
package org.apache.openjpa.jdbc.kernel; package org.apache.openjpa.jdbc.kernel;
import java.sql.Connection; import java.sql.Connection;
import java.util.Collection;
import org.apache.openjpa.jdbc.sql.RowManager;
/** /**
* <P>Batch update manager that writes the SQL in object-level operation order. * <P>Batch update manager that writes the SQL in object-level operation order.
@ -37,12 +40,25 @@ import java.sql.Connection;
*/ */
public class BatchingOperationOrderUpdateManager extends public class BatchingOperationOrderUpdateManager extends
OperationOrderUpdateManager { OperationOrderUpdateManager {
protected PreparedStatementManager newPreparedStatementManager( protected PreparedStatementManager newPreparedStatementManager(
JDBCStore store, Connection conn) { JDBCStore store, Connection conn) {
int batchLimit = dict.getBatchLimit(); int batchLimit = dict.getBatchLimit();
return new BatchingPreparedStatementManagerImpl( return new BatchingPreparedStatementManagerImpl(store, conn,
store, conn, batchLimit); batchLimit);
}
/*
* Override this method to flush any remaining batched row in the
* PreparedStatementManager.
*/
protected Collection flush(RowManager rowMgr,
PreparedStatementManager psMgr, Collection exceps) {
Collection rtnCol = super.flush(rowMgr, psMgr, exceps);
BatchingPreparedStatementManagerImpl bPsMgr =
(BatchingPreparedStatementManagerImpl) psMgr;
bPsMgr.flushBatch();
return rtnCol;
} }
} }

View File

@ -18,20 +18,13 @@
*/ */
package org.apache.openjpa.jdbc.kernel; package org.apache.openjpa.jdbc.kernel;
import java.sql.BatchUpdateException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement; import java.sql.Statement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration; import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.meta.ClassMapping; import org.apache.openjpa.jdbc.meta.ClassMapping;
@ -42,7 +35,6 @@ import org.apache.openjpa.jdbc.sql.SQLExceptions;
import org.apache.openjpa.kernel.OpenJPAStateManager; import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.lib.log.Log; import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.Localizer; import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.ApplicationIds;
import org.apache.openjpa.util.OptimisticException; import org.apache.openjpa.util.OptimisticException;
/** /**
@ -59,7 +51,8 @@ public class BatchingPreparedStatementManagerImpl extends
private final static Localizer _loc = Localizer private final static Localizer _loc = Localizer
.forPackage(BatchingPreparedStatementManagerImpl.class); .forPackage(BatchingPreparedStatementManagerImpl.class);
private Map _cacheSql = null; private String _batchedSql = null;
private List _batchedRows = new ArrayList();
private int _batchLimit; private int _batchLimit;
private boolean _disableBatch = false; private boolean _disableBatch = false;
private transient Log _log = null; private transient Log _log = null;
@ -68,8 +61,7 @@ public class BatchingPreparedStatementManagerImpl extends
* Constructor. Supply connection. * Constructor. Supply connection.
*/ */
public BatchingPreparedStatementManagerImpl(JDBCStore store, public BatchingPreparedStatementManagerImpl(JDBCStore store,
Connection conn, int batchLimit) { Connection conn, int batchLimit) {
super(store, conn); super(store, conn);
_batchLimit = batchLimit; _batchLimit = batchLimit;
_log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC); _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC);
@ -78,202 +70,158 @@ public class BatchingPreparedStatementManagerImpl extends
} }
/** /**
* Flush the given row. This method will cache the statement in a cache. The * Flush the given row immediately or deferred the flush in batch.
* statement will be executed in the flush() method.
*/ */
protected void flushInternal(RowImpl row) throws SQLException { protected void flushAndUpdate(RowImpl row) throws SQLException {
if (_batchLimit == 0 || _disableBatch) { if (isBatchDisabled(row)) {
super.flushInternal(row);
return;
}
Column[] autoAssign = null;
if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns();
// prepare statement
String sql = row.getSQL(_dict);
OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping cmd = null;
if (sm != null)
cmd = (ClassMapping) sm.getMetaData();
// validate batch capability
_disableBatch = _dict.validateBatchProcess(row, autoAssign, sm, cmd);
// process the sql statement, either execute it immediately or
// cache them.
processSql(sql, row);
// set auto assign values
if (autoAssign != null && autoAssign.length > 0 && sm != null) {
Object val;
for (int i = 0; i < autoAssign.length; i++) {
val = _dict.getGeneratedKey(autoAssign[i], _conn);
cmd.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
_store, autoAssign[i], val);
}
sm.setObjectId(ApplicationIds.create(sm.getPersistenceCapable(),
cmd));
}
}
private void processSql(String sql, RowImpl row) throws SQLException {
ArrayList temprow;
if (_cacheSql == null)
_cacheSql = Collections.synchronizedMap(new LinkedHashMap());
if (_disableBatch) {
// if there were some statements batched before, then // if there were some statements batched before, then
// we need to flush them out first before processing the // we need to flush them out first before processing the
// current non batch process. // current non batch process.
if (!_cacheSql.isEmpty()) flushBatch();
flush();
execute(sql, row);
super.flushAndUpdate(row);
} else { } else {
// else start batch support. If the sql string is in the cache, // process the SQL statement, either execute it immediately or
// just adds the row to the cache // batch it for later execution.
if (_cacheSql.containsKey(sql)) { String sql = row.getSQL(_dict);
temprow = (ArrayList) _cacheSql.get(sql); if (_batchedSql == null) {
temprow.add(row); // brand new SQL
_cacheSql.put(sql, temprow); _batchedSql = sql;
} else { } else if (!sql.equals(_batchedSql)) {
// no sql exists in the cache, cache the sql string and its rows // SQL statements changed.
ArrayList inputrow = new ArrayList(); switch (_batchedRows.size()) {
inputrow.add(row); case 0:
_cacheSql.put(sql, inputrow); break;
} case 1:
} // end of batch support // single entry in cache, direct SQL execution.
} super.flushAndUpdate((RowImpl) _batchedRows.get(0));
_batchedRows.clear();
private void execute(String sql, RowImpl row) throws SQLException { break;
PreparedStatement stmnt = null; default:
try { // flush all entries in cache in batch.
ResultSet rs = null; flushBatch();
stmnt = _conn.prepareStatement(sql); }
row.flush(stmnt, _dict, _store); _batchedSql = sql;
int count = stmnt.executeUpdate();
if (count != 1) {
Object failed = row.getFailedObject();
if (failed != null)
_exceptions.add(new OptimisticException(failed));
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count), sql).getMessage());
}
} catch (SQLException se) {
throw SQLExceptions.getStore(se, row.getFailedObject(), _dict);
} finally {
try {
if (stmnt != null)
stmnt.close();
} catch (SQLException se) {
// ignore the exception for this case.
} }
_batchedRows.add(row);
} }
} }
public void flush() { /*
PreparedStatement ps = null; * Compute if batching is disabled, based on values of batch limit
ArrayList list; * and database characteristics.
RowImpl onerow = null; */
private boolean isBatchDisabled(RowImpl row) {
// go thru the cache to process all the sql stmt. boolean rtnVal = true;
if (_cacheSql == null || _cacheSql.isEmpty()) { if (_batchLimit != 0 && !_disableBatch) {
super.flush(); String sql = row.getSQL(_dict);
return; OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping cmd = null;
if (sm != null)
cmd = (ClassMapping) sm.getMetaData();
Column[] autoAssign = null;
if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns();
// validate batch capability
_disableBatch = _dict
.validateBatchProcess(row, autoAssign, sm, cmd);
rtnVal = _disableBatch;
} }
Set e = _cacheSql.keySet(); return rtnVal;
}
for (Iterator itr = e.iterator(); itr.hasNext();) {
String key = (String) itr.next(); /**
* flush all cached up statements to be executed as a single or batched
* prepared statements.
*/
protected void flushBatch() {
if (_batchedSql != null && _batchedRows.size() > 0) {
PreparedStatement ps = null;
try { try {
ps = _conn.prepareStatement(key); RowImpl onerow = null;
} catch (SQLException se) { ps = _conn.prepareStatement(_batchedSql);
throw SQLExceptions.getStore(se, ps, _dict); if (_batchedRows.size() == 1) {
} // execute a single row.
list = (ArrayList) _cacheSql.get(key); onerow = (RowImpl) _batchedRows.get(0);
if (list == null) { flushSingleRow(onerow, ps);
return;
}
// if only 1 row for this statement, then execute it right away
int rowsize = list.size();
try {
if (rowsize == 1) {
onerow = (RowImpl) list.get(0);
onerow.flush(ps, _dict, _store);
int count = ps.executeUpdate();
if (count != 1) {
Object failed = onerow.getFailedObject();
if (failed != null)
_exceptions.add(new OptimisticException(failed));
else if (onerow.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
String.valueOf(count), key).getMessage());
}
} else { } else {
// has more than one rows for this statement, use addBatch // cache has more than one rows, execute as batch.
int count = 0; int count = 0;
for (int i = 0; i < list.size(); i++) { int batchedRowsBaseIndex = 0;
onerow = (RowImpl) list.get(i); Iterator itr = _batchedRows.iterator();
if (count < _batchLimit || _batchLimit == -1) { while (itr.hasNext()) {
onerow.flush(ps, _dict, _store); onerow = (RowImpl) itr.next();
ps.addBatch(); if (_batchLimit == 1) {
count++; flushSingleRow(onerow, ps);
} else { } else {
// reach the batchLimit , execute it if (count < _batchLimit || _batchLimit == -1) {
try { onerow.flush(ps, _dict, _store);
ps.addBatch();
count++;
} else {
// reach the batchLimit, execute the batch
int[] rtn = ps.executeBatch(); int[] rtn = ps.executeBatch();
checkUpdateCount(rtn, onerow, key); checkUpdateCount(rtn, batchedRowsBaseIndex);
} catch (BatchUpdateException bex) {
SQLException sqex = bex.getNextException(); batchedRowsBaseIndex += _batchLimit;
if (sqex == null)
sqex = bex; onerow.flush(ps, _dict, _store);
throw SQLExceptions.getStore(sqex, ps, _dict); ps.addBatch();
// reset the count to 1 for new batch
count = 1;
} }
onerow.flush(ps, _dict, _store);
ps.addBatch();
count = 1; // reset the count to 1 for new batch
} }
} }
// end of the loop, execute the batch // end of the loop, execute the batch
try { int[] rtn = ps.executeBatch();
int[] rtn = ps.executeBatch(); checkUpdateCount(rtn, batchedRowsBaseIndex);
checkUpdateCount(rtn, onerow, key);
} catch (BatchUpdateException bex) {
SQLException sqex = bex.getNextException();
if (sqex == null)
sqex = bex;
throw SQLExceptions.getStore(sqex, ps, _dict);
}
} }
} catch (SQLException se) { } catch (SQLException se) {
SQLException sqex = se.getNextException(); SQLException sqex = se.getNextException();
if (sqex == null) if (sqex == null)
sqex = se; sqex = se;
throw SQLExceptions.getStore(sqex, ps, _dict); throw SQLExceptions.getStore(sqex, ps, _dict);
} } finally {
try { _batchedSql = null;
ps.close(); _batchedRows.clear();
} catch (SQLException sqex) { if (ps != null) {
throw SQLExceptions.getStore(sqex, ps, _dict); try {
ps.close();
} catch (SQLException sqex) {
throw SQLExceptions.getStore(sqex, ps, _dict);
}
}
} }
} }
// instead of calling _cacheSql.clear, null it out to improve the
// performance.
_cacheSql = null;
} }
private void checkUpdateCount(int[] count, RowImpl row, String sql) /*
throws SQLException { * Execute an update of a single row.
*/
private void flushSingleRow(RowImpl row, PreparedStatement ps)
throws SQLException {
row.flush(ps, _dict, _store);
int count = ps.executeUpdate();
if (count != 1) {
Object failed = row.getFailedObject();
if (failed != null)
_exceptions.add(new OptimisticException(failed));
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get("update-failed-no-failed-obj",
String.valueOf(count), row.getSQL(_dict)).getMessage());
}
}
/*
* Process executeBatch function array of return counts.
*/
private void checkUpdateCount(int[] count, int batchedRowsBaseIndex)
throws SQLException {
int cnt = 0; int cnt = 0;
Object failed = null; Object failed = null;
for (int i = 0; i < count.length; i++) { for (int i = 0; i < count.length; i++) {
cnt = count[i]; cnt = count[i];
RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
switch (cnt) { switch (cnt) {
case Statement.EXECUTE_FAILED: // -3 case Statement.EXECUTE_FAILED: // -3
failed = row.getFailedObject(); failed = row.getFailedObject();
@ -281,21 +229,21 @@ public class BatchingPreparedStatementManagerImpl extends
_exceptions.add(new OptimisticException(failed)); _exceptions.add(new OptimisticException(failed));
else if (row.getAction() == Row.ACTION_INSERT) else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get( throw new SQLException(_loc.get(
"update-failed-no-failed-obj", "update-failed-no-failed-obj",
String.valueOf(count[i]), sql).getMessage()); String.valueOf(count[i]), _batchedSql).getMessage());
break; break;
case Statement.SUCCESS_NO_INFO: // -2 case Statement.SUCCESS_NO_INFO: // -2
if (_log.isTraceEnabled()) if (_log.isTraceEnabled())
_log.trace(_loc.get("batch_update_info", _log.trace(_loc.get("batch_update_info",
String.valueOf(cnt), sql).getMessage()); String.valueOf(cnt), _batchedSql).getMessage());
break; break;
case 0: // no row is inserted, treats it as failed case 0: // no row is inserted, treats it as failed
// case // case
failed = row.getFailedObject(); failed = row.getFailedObject();
if ((failed != null || row.getAction() == Row.ACTION_INSERT)) if ((failed != null || row.getAction() == Row.ACTION_INSERT))
throw new SQLException(_loc.get( throw new SQLException(_loc.get(
"update-failed-no-failed-obj", "update-failed-no-failed-obj",
String.valueOf(count[i]), sql).getMessage()); String.valueOf(count[i]), _batchedSql).getMessage());
} }
} }
} }

View File

@ -86,10 +86,33 @@ public class PreparedStatementManagerImpl
if (row.getAction() == Row.ACTION_INSERT) if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns(); autoAssign = row.getTable().getAutoAssignedColumns();
flushAndUpdate(row);
// set auto assign values
if (autoAssign != null && autoAssign.length > 0
&& row.getPrimaryKey() != null) {
OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping mapping = (ClassMapping) sm.getMetaData();
Object val;
for (int i = 0; i < autoAssign.length; i++) {
val = _dict.getGeneratedKey(autoAssign[i], _conn);
mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
_store, autoAssign[i], val);
}
sm.setObjectId(
ApplicationIds.create(sm.getPersistenceCapable(), mapping));
}
}
/**
* Flush the given row immediately.
*/
protected void flushAndUpdate(RowImpl row)
throws SQLException {
// prepare statement // prepare statement
String sql = row.getSQL(_dict); String sql = row.getSQL(_dict);
PreparedStatement stmnt = prepareStatement(sql); PreparedStatement stmnt = prepareStatement(sql);
// setup parameters and execute statement // setup parameters and execute statement
if (stmnt != null) if (stmnt != null)
row.flush(stmnt, _dict, _store); row.flush(stmnt, _dict, _store);
@ -107,23 +130,12 @@ public class PreparedStatementManagerImpl
} catch (SQLException se) { } catch (SQLException se) {
throw SQLExceptions.getStore(se, row.getFailedObject(), _dict); throw SQLExceptions.getStore(se, row.getFailedObject(), _dict);
} finally { } finally {
if (stmnt != null) if (stmnt != null) {
try { stmnt.close(); } catch (SQLException se) {} try {
} stmnt.close();
} catch (SQLException se) {
// set auto assign values }
if (autoAssign != null && autoAssign.length > 0
&& row.getPrimaryKey() != null) {
OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping mapping = (ClassMapping) sm.getMetaData();
Object val;
for (int i = 0; i < autoAssign.length; i++) {
val = _dict.getGeneratedKey(autoAssign[i], _conn);
mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
_store, autoAssign[i], val);
} }
sm.setObjectId(
ApplicationIds.create(sm.getPersistenceCapable(), mapping));
} }
} }
@ -146,5 +158,5 @@ public class PreparedStatementManagerImpl
protected PreparedStatement prepareStatement(String sql) protected PreparedStatement prepareStatement(String sql)
throws SQLException { throws SQLException {
return _conn.prepareStatement(sql); return _conn.prepareStatement(sql);
} }
} }