Perform JDBC sequence ops outside of synchronization blocks in case of JDBC

hangs.  Also should improve concurrency.



git-svn-id: https://svn.apache.org/repos/asf/incubator/openjpa/trunk@448298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
A. Abram White 2006-09-20 18:40:28 +00:00
parent cdcfea47d4
commit 480a42df4a
4 changed files with 104 additions and 111 deletions

View File

@ -41,10 +41,6 @@ public abstract class AbstractJDBCSeq
protected int type = TYPE_DEFAULT; protected int type = TYPE_DEFAULT;
protected Object current = null; protected Object current = null;
// used to track current conn so that we can close it
private Connection _conn = null;
private boolean _commit = false;
/** /**
* Records the sequence type. * Records the sequence type.
*/ */
@ -52,7 +48,7 @@ public abstract class AbstractJDBCSeq
this.type = type; this.type = type;
} }
public synchronized Object next(StoreContext ctx, ClassMetaData meta) { public Object next(StoreContext ctx, ClassMetaData meta) {
JDBCStore store = getStore(ctx); JDBCStore store = getStore(ctx);
try { try {
current = nextInternal(store, (ClassMapping) meta); current = nextInternal(store, (ClassMapping) meta);
@ -63,12 +59,10 @@ public abstract class AbstractJDBCSeq
throw SQLExceptions.getStore(se, store.getDBDictionary()); throw SQLExceptions.getStore(se, store.getDBDictionary());
} catch (Exception e) { } catch (Exception e) {
throw new StoreException(e); throw new StoreException(e);
} finally {
closeConnection();
} }
} }
public synchronized Object current(StoreContext ctx, ClassMetaData meta) { public Object current(StoreContext ctx, ClassMetaData meta) {
JDBCStore store = getStore(ctx); JDBCStore store = getStore(ctx);
try { try {
return currentInternal(store, (ClassMapping) meta); return currentInternal(store, (ClassMapping) meta);
@ -78,13 +72,10 @@ public abstract class AbstractJDBCSeq
throw SQLExceptions.getStore(se, store.getDBDictionary()); throw SQLExceptions.getStore(se, store.getDBDictionary());
} catch (Exception e) { } catch (Exception e) {
throw new StoreException(e); throw new StoreException(e);
} finally {
closeConnection();
} }
} }
public synchronized void allocate(int additional, StoreContext ctx, public void allocate(int additional, StoreContext ctx, ClassMetaData meta) {
ClassMetaData meta) {
JDBCStore store = getStore(ctx); JDBCStore store = getStore(ctx);
try { try {
allocateInternal(additional, store, (ClassMapping) meta); allocateInternal(additional, store, (ClassMapping) meta);
@ -94,8 +85,6 @@ public abstract class AbstractJDBCSeq
throw SQLExceptions.getStore(se, store.getDBDictionary()); throw SQLExceptions.getStore(se, store.getDBDictionary());
} catch (Exception e) { } catch (Exception e) {
throw new StoreException(e); throw new StoreException(e);
} finally {
closeConnection();
} }
} }
@ -121,7 +110,7 @@ public abstract class AbstractJDBCSeq
/** /**
* Return the current sequence object. By default returns the last * Return the current sequence object. By default returns the last
* sequence value used, or null if no sequence values have been requested * sequence value used, or null if no sequence values have been requested
* yet. * yet. Default implementation is not threadsafe.
*/ */
protected Object currentInternal(JDBCStore store, ClassMapping mapping) protected Object currentInternal(JDBCStore store, ClassMapping mapping)
throws Exception { throws Exception {
@ -149,41 +138,31 @@ public abstract class AbstractJDBCSeq
*/ */
protected Connection getConnection(JDBCStore store) protected Connection getConnection(JDBCStore store)
throws SQLException { throws SQLException {
// close previous connection if user is asking for another connection
closeConnection();
if (type == TYPE_TRANSACTIONAL || type == TYPE_CONTIGUOUS) if (type == TYPE_TRANSACTIONAL || type == TYPE_CONTIGUOUS)
_conn = store.getConnection(); return store.getConnection();
else {
JDBCConfiguration conf = store.getConfiguration(); JDBCConfiguration conf = store.getConfiguration();
DataSource ds = conf.getDataSource2(store.getContext()); DataSource ds = conf.getDataSource2(store.getContext());
_conn = ds.getConnection(); Connection conn = ds.getConnection();
if (_conn.getAutoCommit()) if (conn.getAutoCommit())
_conn.setAutoCommit(false); conn.setAutoCommit(false);
_commit = true; return conn;
}
return _conn;
} }
/** /**
* Close the current connection. * Close the current connection.
*/ */
protected void closeConnection() { protected void closeConnection(Connection conn) {
if (_conn == null) if (conn == null)
return; return;
try { try {
if (_commit) if (type == TYPE_TRANSACTIONAL || type == TYPE_CONTIGUOUS)
_conn.commit(); conn.commit();
} catch (SQLException se) { } catch (SQLException se) {
throw SQLExceptions.getStore(se); throw SQLExceptions.getStore(se);
} finally { } finally {
try { try { conn.close(); } catch (SQLException se) {}
_conn.close();
} catch (SQLException se) {
}
_conn = null;
_commit = false;
} }
} }
} }

View File

@ -44,8 +44,8 @@ import org.apache.openjpa.meta.JavaTypes;
public class ClassTableJDBCSeq public class ClassTableJDBCSeq
extends TableJDBCSeq { extends TableJDBCSeq {
private static final Localizer _loc = Localizer private static final Localizer _loc = Localizer.forPackage
.forPackage(ClassTableJDBCSeq.class); (ClassTableJDBCSeq.class);
private final Map _stats = new HashMap(); private final Map _stats = new HashMap();
private boolean _ignore = false; private boolean _ignore = false;
@ -95,7 +95,7 @@ public class ClassTableJDBCSeq
_aliases = aliases; _aliases = aliases;
} }
protected Status getStatus(ClassMapping mapping) { protected synchronized Status getStatus(ClassMapping mapping) {
if (mapping == null) if (mapping == null)
return null; return null;
String key = getKey(mapping, false); String key = getKey(mapping, false);

View File

@ -199,8 +199,12 @@ public class NativeJDBCSeq
protected Object nextInternal(JDBCStore store, ClassMapping mapping) protected Object nextInternal(JDBCStore store, ClassMapping mapping)
throws SQLException { throws SQLException {
long next = getSequence(getConnection(store)); Connection conn = getConnection(store);
return Numbers.valueOf(next); try {
return Numbers.valueOf(getSequence(conn));
} finally {
closeConnection(conn);
}
} }
/** /**

View File

@ -213,20 +213,29 @@ public class TableJDBCSeq
throw new InvalidStateException(_loc.get("bad-seq-type", throw new InvalidStateException(_loc.get("bad-seq-type",
getClass(), mapping)); getClass(), mapping));
// make sure seq is at least 1, since autoassigned ids of 0 can while (true) {
// conflict with uninitialized values synchronized (stat) {
stat.seq = Math.max(stat.seq, 1); // make sure seq is at least 1, since autoassigned ids of 0 can
if (stat.seq >= stat.max) // conflict with uninitialized values
stat.seq = Math.max(stat.seq, 1);
if (stat.seq < stat.max)
return Numbers.valueOf(stat.seq++);
}
allocateSequence(store, mapping, stat, _alloc, true); allocateSequence(store, mapping, stat, _alloc, true);
return Numbers.valueOf(stat.seq++); }
} }
protected Object currentInternal(JDBCStore store, ClassMapping mapping) protected Object currentInternal(JDBCStore store, ClassMapping mapping)
throws Exception { throws Exception {
if (current == null) { if (current == null) {
long cur = getSequence(mapping, getConnection(store)); Connection conn = getConnection(store);
if (cur != -1) try {
current = Numbers.valueOf(cur); long cur = getSequence(mapping, getConnection(store));
if (cur != -1)
current = Numbers.valueOf(cur);
} finally {
closeConnection(conn);
}
} }
return super.currentInternal(store, mapping); return super.currentInternal(store, mapping);
} }
@ -235,9 +244,18 @@ public class TableJDBCSeq
ClassMapping mapping) ClassMapping mapping)
throws SQLException { throws SQLException {
Status stat = getStatus(mapping); Status stat = getStatus(mapping);
if (stat != null && stat.max - stat.seq < count) if (stat == null)
allocateSequence(store, mapping, stat, return;
count - (int) (stat.max - stat.seq), false);
while (true) {
int available;
synchronized (stat) {
available = (int) (stat.max - stat.seq);
if (available >= count)
return;
}
allocateSequence(store, mapping, stat, count - available, false);
}
} }
/** /**
@ -295,40 +313,45 @@ public class TableJDBCSeq
* Updates the max available sequence value. * Updates the max available sequence value.
*/ */
private void allocateSequence(JDBCStore store, ClassMapping mapping, private void allocateSequence(JDBCStore store, ClassMapping mapping,
Status stat, int alloc, boolean updateStatSeq) { Status stat, int alloc, boolean updateStatSeq)
try { throws SQLException {
// if the update fails, probably because row doesn't exist yet Connection conn = getConnection(store);
if (!setSequence(mapping, stat, alloc, updateStatSeq, try {
getConnection(store))) { if (setSequence(mapping, stat, alloc, updateStatSeq, conn))
closeConnection(); return;
} catch (SQLException se) {
// possible that we might get errors when inserting if throw SQLExceptions.getStore(_loc.get("bad-seq-up", _table),
// another thread/process is inserting same pk at same time se, _conf.getDBDictionaryInstance());
SQLException err = null; } finally {
Connection conn = _conf.getDataSource2(store.getContext()). closeConnection(conn);
getConnection();
try {
insertSequence(mapping, conn);
} catch (SQLException se) {
err = se;
} finally {
try {
conn.close();
} catch (SQLException se) {
}
}
// now we should be able to update...
if (!setSequence(mapping, stat, alloc, updateStatSeq,
getConnection(store)))
throw(err != null) ? err : new SQLException(_loc.get
("no-seq-row", mapping, _table).getMessage());
}
} }
catch (SQLException se2) {
try {
// possible that we might get errors when inserting if
// another thread/process is inserting same pk at same time
SQLException err = null;
conn = _conf.getDataSource2(store.getContext()).getConnection();
try {
insertSequence(mapping, conn);
} catch (SQLException se) {
err = se;
} finally {
try { conn.close(); } catch (SQLException se) {}
}
// now we should be able to update...
conn = getConnection(store);
try {
if (!setSequence(mapping, stat, alloc, updateStatSeq, conn))
throw (err != null) ? err : new SQLException(_loc.get
("no-seq-row", mapping, _table).getMessage());
} finally {
closeConnection(conn);
}
} catch (SQLException se2) {
throw SQLExceptions.getStore(_loc.get("bad-seq-up", _table), throw SQLExceptions.getStore(_loc.get("bad-seq-up", _table),
se2, _conf.getDBDictionaryInstance()); se2, _conf.getDBDictionaryInstance());
} }
} }
/** /**
@ -362,10 +385,7 @@ public class TableJDBCSeq
stmnt.executeUpdate(); stmnt.executeUpdate();
} finally { } finally {
if (stmnt != null) if (stmnt != null)
try { try { stmnt.close(); } catch (SQLException se) {}
stmnt.close();
} catch (SQLException se) {
}
if (!wasAuto) if (!wasAuto)
conn.setAutoCommit(false); conn.setAutoCommit(false);
} }
@ -401,14 +421,8 @@ public class TableJDBCSeq
return dict.getLong(rs, 1); return dict.getLong(rs, 1);
} finally { } finally {
if (rs != null) if (rs != null)
try { try { rs.close(); } catch (SQLException se) {}
rs.close(); try { stmnt.close(); } catch (SQLException se) {}
} catch (SQLException se) {
}
try {
stmnt.close();
} catch (SQLException se) {
}
} }
} }
@ -433,8 +447,7 @@ public class TableJDBCSeq
SQLBuffer where = new SQLBuffer(dict).append(_pkColumn).append(" = "). SQLBuffer where = new SQLBuffer(dict).append(_pkColumn).append(" = ").
appendValue(pk, _pkColumn); appendValue(pk, _pkColumn);
// not all databases support locking, so loop until we have a // loop until we have a successful atomic select/update sequence
// successful atomic select/update sequence
long cur = 0; long cur = 0;
PreparedStatement stmnt; PreparedStatement stmnt;
ResultSet rs; ResultSet rs;
@ -459,23 +472,20 @@ public class TableJDBCSeq
stmnt = upd.prepareStatement(conn); stmnt = upd.prepareStatement(conn);
updates = stmnt.executeUpdate(); updates = stmnt.executeUpdate();
} finally { } finally {
if (rs != null) if (rs != null)
try { try { rs.close(); } catch (SQLException se) {}
rs.close();
} catch (SQLException se) {
}
if (stmnt != null) if (stmnt != null)
try { try { stmnt.close(); } catch (SQLException se) {}
stmnt.close();
} catch (SQLException se) {
}
} }
} }
// setup new sequence range // setup new sequence range
if (updateStatSeq) synchronized (stat) {
stat.seq = cur; if (updateStatSeq && stat.seq < cur)
stat.max = cur + inc; stat.seq = cur;
if (stat.max < cur + inc)
stat.max = cur + inc;
}
return true; return true;
} }