OPENJPA-598 Make BatchingPreparedStatementManagerImpl more flexible and extensible, Sub-task of OPENJPA-477

Committing patch provided by Fay Wang

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@654942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Catalina Wei 2008-05-09 21:34:29 +00:00
parent 01e08029e9
commit 5a16e92bd9
1 changed files with 93 additions and 43 deletions

View File

@ -83,6 +83,11 @@ public class BatchingPreparedStatementManagerImpl extends
} else { } else {
// process the SQL statement, either execute it immediately or // process the SQL statement, either execute it immediately or
// batch it for later execution. // batch it for later execution.
batchOrExecuteRow(row);
}
}
protected void batchOrExecuteRow(RowImpl row) throws SQLException {
String sql = row.getSQL(_dict); String sql = row.getSQL(_dict);
if (_batchedSql == null) { if (_batchedSql == null) {
// brand new SQL // brand new SQL
@ -105,7 +110,6 @@ public class BatchingPreparedStatementManagerImpl extends
} }
_batchedRows.add(row); _batchedRows.add(row);
} }
}
/* /*
* Compute if batching is disabled, based on values of batch limit * Compute if batching is disabled, based on values of batch limit
@ -113,8 +117,7 @@ public class BatchingPreparedStatementManagerImpl extends
*/ */
private boolean isBatchDisabled(RowImpl row) { private boolean isBatchDisabled(RowImpl row) {
boolean rtnVal = true; boolean rtnVal = true;
if (_batchLimit != 0 && !_disableBatch) { if (getBatchLimit() != 0 && !isBatchDisabled()) {
String sql = row.getSQL(_dict);
OpenJPAStateManager sm = row.getPrimaryKey(); OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping cmd = null; ClassMapping cmd = null;
if (sm != null) if (sm != null)
@ -123,9 +126,9 @@ public class BatchingPreparedStatementManagerImpl extends
if (row.getAction() == Row.ACTION_INSERT) if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns(); autoAssign = row.getTable().getAutoAssignedColumns();
// validate batch capability // validate batch capability
_disableBatch = _dict rtnVal = _dict
.validateBatchProcess(row, autoAssign, sm, cmd); .validateBatchProcess(row, autoAssign, sm, cmd);
rtnVal = _disableBatch; setBatchDisabled(rtnVal);
} }
return rtnVal; return rtnVal;
} }
@ -135,45 +138,53 @@ public class BatchingPreparedStatementManagerImpl extends
* prepared statements. * prepared statements.
*/ */
protected void flushBatch() { protected void flushBatch() {
if (_batchedSql != null && _batchedRows.size() > 0) { List batchedRows = getBatchedRows();
String batchedSql = getBatchedSql();
if (batchedRows == null)
return;
int batchSize = batchedRows.size();
if (batchedSql != null && batchSize > 0) {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
RowImpl onerow = null; RowImpl onerow = null;
ps = _conn.prepareStatement(_batchedSql); ps = prepareStatement(batchedSql);
if (_batchedRows.size() == 1) { if (batchSize == 1) {
// execute a single row. // execute a single row.
onerow = (RowImpl) _batchedRows.get(0); onerow = (RowImpl) batchedRows.get(0);
flushSingleRow(onerow, ps); flushSingleRow(onerow, ps);
} else { } else {
// cache has more than one rows, execute as batch. // cache has more than one rows, execute as batch.
int count = 0; int count = 0;
int batchedRowsBaseIndex = 0; int batchedRowsBaseIndex = 0;
Iterator itr = _batchedRows.iterator(); Iterator itr = batchedRows.iterator();
while (itr.hasNext()) { while (itr.hasNext()) {
onerow = (RowImpl) itr.next(); onerow = (RowImpl) itr.next();
if (_batchLimit == 1) { if (_batchLimit == 1) {
flushSingleRow(onerow, ps); flushSingleRow(onerow, ps);
} else { } else {
if (count < _batchLimit || _batchLimit == -1) { if (count < _batchLimit || _batchLimit == -1) {
if (ps != null)
onerow.flush(ps, _dict, _store); onerow.flush(ps, _dict, _store);
ps.addBatch(); addBatch(ps, onerow, count);
count++; count++;
} else { } else {
// reach the batchLimit, execute the batch // reach the batchLimit, execute the batch
int[] rtn = ps.executeBatch(); int[] rtn = executeBatch(ps);
checkUpdateCount(rtn, batchedRowsBaseIndex); checkUpdateCount(rtn, batchedRowsBaseIndex);
batchedRowsBaseIndex += _batchLimit; batchedRowsBaseIndex += _batchLimit;
if (ps != null)
onerow.flush(ps, _dict, _store); onerow.flush(ps, _dict, _store);
ps.addBatch(); addBatch(ps, onerow, count);
// reset the count to 1 for new batch // reset the count to 1 for new batch
count = 1; count = 1;
} }
} }
} }
// end of the loop, execute the batch // end of the loop, execute the batch
int[] rtn = ps.executeBatch(); int[] rtn = executeBatch(ps);
checkUpdateCount(rtn, batchedRowsBaseIndex); checkUpdateCount(rtn, batchedRowsBaseIndex);
} }
} catch (SQLException se) { } catch (SQLException se) {
@ -183,7 +194,7 @@ public class BatchingPreparedStatementManagerImpl extends
throw SQLExceptions.getStore(sqex, ps, _dict); throw SQLExceptions.getStore(sqex, ps, _dict);
} finally { } finally {
_batchedSql = null; _batchedSql = null;
_batchedRows.clear(); batchedRows.clear();
if (ps != null) { if (ps != null) {
try { try {
ps.close(); ps.close();
@ -200,8 +211,9 @@ public class BatchingPreparedStatementManagerImpl extends
*/ */
private void flushSingleRow(RowImpl row, PreparedStatement ps) private void flushSingleRow(RowImpl row, PreparedStatement ps)
throws SQLException { throws SQLException {
if (ps != null)
row.flush(ps, _dict, _store); row.flush(ps, _dict, _store);
int count = ps.executeUpdate(); int count = executeUpdate(ps, row.getSQL(_dict), row);
if (count != 1) { if (count != 1) {
Object failed = row.getFailedObject(); Object failed = row.getFailedObject();
if (failed != null) if (failed != null)
@ -219,9 +231,10 @@ public class BatchingPreparedStatementManagerImpl extends
throws SQLException { throws SQLException {
int cnt = 0; int cnt = 0;
Object failed = null; Object failed = null;
List batchedRows = getBatchedRows();
for (int i = 0; i < count.length; i++) { for (int i = 0; i < count.length; i++) {
cnt = count[i]; cnt = count[i];
RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i); RowImpl row = (RowImpl) batchedRows.get(batchedRowsBaseIndex + i);
failed = row.getFailedObject(); failed = row.getFailedObject();
switch (cnt) { switch (cnt) {
case Statement.EXECUTE_FAILED: // -3 case Statement.EXECUTE_FAILED: // -3
@ -230,14 +243,16 @@ public class BatchingPreparedStatementManagerImpl extends
else if (row.getAction() == Row.ACTION_INSERT) else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get( throw new SQLException(_loc.get(
"update-failed-no-failed-obj", "update-failed-no-failed-obj",
String.valueOf(count[i]), _batchedSql).getMessage()); String.valueOf(count[i]),
row.getSQL(_dict)).getMessage());
break; break;
case Statement.SUCCESS_NO_INFO: // -2 case Statement.SUCCESS_NO_INFO: // -2
if (failed != null || row.getAction() == Row.ACTION_UPDATE) if (failed != null || row.getAction() == Row.ACTION_UPDATE)
_exceptions.add(new OptimisticException(failed)); _exceptions.add(new OptimisticException(failed));
else if (_log.isTraceEnabled()) else if (_log.isTraceEnabled())
_log.trace(_loc.get("batch_update_info", _log.trace(_loc.get("batch_update_info",
String.valueOf(cnt), _batchedSql).getMessage()); String.valueOf(cnt),
row.getSQL(_dict)).getMessage());
break; break;
case 0: // no row is inserted, treats it as failed case 0: // no row is inserted, treats it as failed
// case // case
@ -246,8 +261,43 @@ public class BatchingPreparedStatementManagerImpl extends
else if (row.getAction() == Row.ACTION_INSERT) else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get( throw new SQLException(_loc.get(
"update-failed-no-failed-obj", "update-failed-no-failed-obj",
String.valueOf(count[i]), _batchedSql).getMessage()); String.valueOf(count[i]),
row.getSQL(_dict)).getMessage());
} }
} }
} }
public boolean isBatchDisabled() {
return _disableBatch;
}
public void setBatchDisabled(boolean disableBatch) {
_disableBatch = disableBatch;
}
public int getBatchLimit() {
return _batchLimit;
}
public void setBatchLimit(int batchLimit) {
_batchLimit = batchLimit;
}
public List getBatchedRows() {
return _batchedRows;
}
public String getBatchedSql() {
return _batchedSql;
}
protected void addBatch(PreparedStatement ps, RowImpl row,
int count) throws SQLException {
ps.addBatch();
}
protected int[] executeBatch(PreparedStatement ps)
throws SQLException {
return ps.executeBatch();
}
} }