From 480a42df4a036e53f0a06813d7b3412213cc610c Mon Sep 17 00:00:00 2001 From: "A. Abram White" Date: Wed, 20 Sep 2006 18:40:28 +0000 Subject: [PATCH] Perform JDBC sequence ops outside of synchronization blocks in case of JDBC hangs. Also should improve concurrency. git-svn-id: https://svn.apache.org/repos/asf/incubator/openjpa/trunk@448298 13f79535-47bb-0310-9956-ffa450edef68 --- .../openjpa/jdbc/kernel/AbstractJDBCSeq.java | 55 ++----- .../jdbc/kernel/ClassTableJDBCSeq.java | 6 +- .../openjpa/jdbc/kernel/NativeJDBCSeq.java | 8 +- .../openjpa/jdbc/kernel/TableJDBCSeq.java | 146 ++++++++++-------- 4 files changed, 104 insertions(+), 111 deletions(-) diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/AbstractJDBCSeq.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/AbstractJDBCSeq.java index 6ddf43014..c4f99c5b9 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/AbstractJDBCSeq.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/AbstractJDBCSeq.java @@ -41,10 +41,6 @@ public abstract class AbstractJDBCSeq protected int type = TYPE_DEFAULT; protected Object current = null; - // used to track current conn so that we can close it - private Connection _conn = null; - private boolean _commit = false; - /** * Records the sequence type. */ @@ -52,7 +48,7 @@ public abstract class AbstractJDBCSeq this.type = type; } - public synchronized Object next(StoreContext ctx, ClassMetaData meta) { + public Object next(StoreContext ctx, ClassMetaData meta) { JDBCStore store = getStore(ctx); try { current = nextInternal(store, (ClassMapping) meta); @@ -63,12 +59,10 @@ public abstract class AbstractJDBCSeq throw SQLExceptions.getStore(se, store.getDBDictionary()); } catch (Exception e) { throw new StoreException(e); - } finally { - closeConnection(); } } - public synchronized Object current(StoreContext ctx, ClassMetaData meta) { + public Object current(StoreContext ctx, ClassMetaData meta) { JDBCStore store = getStore(ctx); try { return currentInternal(store, (ClassMapping) meta); @@ -78,13 +72,10 @@ public abstract class AbstractJDBCSeq throw SQLExceptions.getStore(se, store.getDBDictionary()); } catch (Exception e) { throw new StoreException(e); - } finally { - closeConnection(); } } - public synchronized void allocate(int additional, StoreContext ctx, - ClassMetaData meta) { + public void allocate(int additional, StoreContext ctx, ClassMetaData meta) { JDBCStore store = getStore(ctx); try { allocateInternal(additional, store, (ClassMapping) meta); @@ -94,8 +85,6 @@ public abstract class AbstractJDBCSeq throw SQLExceptions.getStore(se, store.getDBDictionary()); } catch (Exception e) { throw new StoreException(e); - } finally { - closeConnection(); } } @@ -121,7 +110,7 @@ public abstract class AbstractJDBCSeq /** * Return the current sequence object. By default returns the last * sequence value used, or null if no sequence values have been requested - * yet. + * yet. Default implementation is not threadsafe. */ protected Object currentInternal(JDBCStore store, ClassMapping mapping) throws Exception { @@ -149,41 +138,31 @@ public abstract class AbstractJDBCSeq */ protected Connection getConnection(JDBCStore store) throws SQLException { - // close previous connection if user is asking for another connection - closeConnection(); - if (type == TYPE_TRANSACTIONAL || type == TYPE_CONTIGUOUS) - _conn = store.getConnection(); - else { - JDBCConfiguration conf = store.getConfiguration(); - DataSource ds = conf.getDataSource2(store.getContext()); - _conn = ds.getConnection(); - if (_conn.getAutoCommit()) - _conn.setAutoCommit(false); - _commit = true; - } - return _conn; + return store.getConnection(); + + JDBCConfiguration conf = store.getConfiguration(); + DataSource ds = conf.getDataSource2(store.getContext()); + Connection conn = ds.getConnection(); + if (conn.getAutoCommit()) + conn.setAutoCommit(false); + return conn; } /** * Close the current connection. */ - protected void closeConnection() { - if (_conn == null) + protected void closeConnection(Connection conn) { + if (conn == null) return; try { - if (_commit) - _conn.commit(); + if (type == TYPE_TRANSACTIONAL || type == TYPE_CONTIGUOUS) + conn.commit(); } catch (SQLException se) { throw SQLExceptions.getStore(se); } finally { - try { - _conn.close(); - } catch (SQLException se) { - } - _conn = null; - _commit = false; + try { conn.close(); } catch (SQLException se) {} } } } diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ClassTableJDBCSeq.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ClassTableJDBCSeq.java index aed273dbf..e5ca574de 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ClassTableJDBCSeq.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ClassTableJDBCSeq.java @@ -44,8 +44,8 @@ import org.apache.openjpa.meta.JavaTypes; public class ClassTableJDBCSeq extends TableJDBCSeq { - private static final Localizer _loc = Localizer - .forPackage(ClassTableJDBCSeq.class); + private static final Localizer _loc = Localizer.forPackage + (ClassTableJDBCSeq.class); private final Map _stats = new HashMap(); private boolean _ignore = false; @@ -95,7 +95,7 @@ public class ClassTableJDBCSeq _aliases = aliases; } - protected Status getStatus(ClassMapping mapping) { + protected synchronized Status getStatus(ClassMapping mapping) { if (mapping == null) return null; String key = getKey(mapping, false); diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/NativeJDBCSeq.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/NativeJDBCSeq.java index bab8efb41..33a542c70 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/NativeJDBCSeq.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/NativeJDBCSeq.java @@ -199,8 +199,12 @@ public class NativeJDBCSeq protected Object nextInternal(JDBCStore store, ClassMapping mapping) throws SQLException { - long next = getSequence(getConnection(store)); - return Numbers.valueOf(next); + Connection conn = getConnection(store); + try { + return Numbers.valueOf(getSequence(conn)); + } finally { + closeConnection(conn); + } } /** diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/TableJDBCSeq.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/TableJDBCSeq.java index cbb226382..4613eb777 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/TableJDBCSeq.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/TableJDBCSeq.java @@ -213,20 +213,29 @@ public class TableJDBCSeq throw new InvalidStateException(_loc.get("bad-seq-type", getClass(), mapping)); - // make sure seq is at least 1, since autoassigned ids of 0 can - // conflict with uninitialized values - stat.seq = Math.max(stat.seq, 1); - if (stat.seq >= stat.max) + while (true) { + synchronized (stat) { + // make sure seq is at least 1, since autoassigned ids of 0 can + // conflict with uninitialized values + stat.seq = Math.max(stat.seq, 1); + if (stat.seq < stat.max) + return Numbers.valueOf(stat.seq++); + } allocateSequence(store, mapping, stat, _alloc, true); - return Numbers.valueOf(stat.seq++); + } } protected Object currentInternal(JDBCStore store, ClassMapping mapping) throws Exception { if (current == null) { - long cur = getSequence(mapping, getConnection(store)); - if (cur != -1) - current = Numbers.valueOf(cur); + Connection conn = getConnection(store); + try { + long cur = getSequence(mapping, getConnection(store)); + if (cur != -1) + current = Numbers.valueOf(cur); + } finally { + closeConnection(conn); + } } return super.currentInternal(store, mapping); } @@ -235,9 +244,18 @@ public class TableJDBCSeq ClassMapping mapping) throws SQLException { Status stat = getStatus(mapping); - if (stat != null && stat.max - stat.seq < count) - allocateSequence(store, mapping, stat, - count - (int) (stat.max - stat.seq), false); + if (stat == null) + return; + + while (true) { + int available; + synchronized (stat) { + available = (int) (stat.max - stat.seq); + if (available >= count) + return; + } + allocateSequence(store, mapping, stat, count - available, false); + } } /** @@ -295,40 +313,45 @@ public class TableJDBCSeq * Updates the max available sequence value. */ private void allocateSequence(JDBCStore store, ClassMapping mapping, - Status stat, int alloc, boolean updateStatSeq) { - try { - // if the update fails, probably because row doesn't exist yet - if (!setSequence(mapping, stat, alloc, updateStatSeq, - getConnection(store))) { - closeConnection(); - - // possible that we might get errors when inserting if - // another thread/process is inserting same pk at same time - SQLException err = null; - Connection conn = _conf.getDataSource2(store.getContext()). - getConnection(); - try { - insertSequence(mapping, conn); - } catch (SQLException se) { - err = se; - } finally { - try { - conn.close(); - } catch (SQLException se) { - } - } - - // now we should be able to update... - if (!setSequence(mapping, stat, alloc, updateStatSeq, - getConnection(store))) - throw(err != null) ? err : new SQLException(_loc.get - ("no-seq-row", mapping, _table).getMessage()); - } + Status stat, int alloc, boolean updateStatSeq) + throws SQLException { + Connection conn = getConnection(store); + try { + if (setSequence(mapping, stat, alloc, updateStatSeq, conn)) + return; + } catch (SQLException se) { + throw SQLExceptions.getStore(_loc.get("bad-seq-up", _table), + se, _conf.getDBDictionaryInstance()); + } finally { + closeConnection(conn); } - catch (SQLException se2) { + + try { + // possible that we might get errors when inserting if + // another thread/process is inserting same pk at same time + SQLException err = null; + conn = _conf.getDataSource2(store.getContext()).getConnection(); + try { + insertSequence(mapping, conn); + } catch (SQLException se) { + err = se; + } finally { + try { conn.close(); } catch (SQLException se) {} + } + + // now we should be able to update... + conn = getConnection(store); + try { + if (!setSequence(mapping, stat, alloc, updateStatSeq, conn)) + throw (err != null) ? err : new SQLException(_loc.get + ("no-seq-row", mapping, _table).getMessage()); + } finally { + closeConnection(conn); + } + } catch (SQLException se2) { throw SQLExceptions.getStore(_loc.get("bad-seq-up", _table), se2, _conf.getDBDictionaryInstance()); - } + } } /** @@ -362,10 +385,7 @@ public class TableJDBCSeq stmnt.executeUpdate(); } finally { if (stmnt != null) - try { - stmnt.close(); - } catch (SQLException se) { - } + try { stmnt.close(); } catch (SQLException se) {} if (!wasAuto) conn.setAutoCommit(false); } @@ -401,14 +421,8 @@ public class TableJDBCSeq return dict.getLong(rs, 1); } finally { if (rs != null) - try { - rs.close(); - } catch (SQLException se) { - } - try { - stmnt.close(); - } catch (SQLException se) { - } + try { rs.close(); } catch (SQLException se) {} + try { stmnt.close(); } catch (SQLException se) {} } } @@ -433,8 +447,7 @@ public class TableJDBCSeq SQLBuffer where = new SQLBuffer(dict).append(_pkColumn).append(" = "). appendValue(pk, _pkColumn); - // not all databases support locking, so loop until we have a - // successful atomic select/update sequence + // loop until we have a successful atomic select/update sequence long cur = 0; PreparedStatement stmnt; ResultSet rs; @@ -459,23 +472,20 @@ public class TableJDBCSeq stmnt = upd.prepareStatement(conn); updates = stmnt.executeUpdate(); } finally { - if (rs != null) - try { - rs.close(); - } catch (SQLException se) { - } + if (rs != null) + try { rs.close(); } catch (SQLException se) {} if (stmnt != null) - try { - stmnt.close(); - } catch (SQLException se) { - } + try { stmnt.close(); } catch (SQLException se) {} } } // setup new sequence range - if (updateStatSeq) - stat.seq = cur; - stat.max = cur + inc; + synchronized (stat) { + if (updateStatSeq && stat.seq < cur) + stat.seq = cur; + if (stat.max < cur + inc) + stat.max = cur + inc; + } return true; }