OPENJPA-130. Committing on behalf of Ignacio Andreu.

git-svn-id: https://svn.apache.org/repos/asf/openjpa/branches/1.1.x@655581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Patrick Linskey 2008-05-12 17:59:27 +00:00
parent d90a7c7a6e
commit 813d37d743
12 changed files with 330 additions and 28 deletions

View File

@ -48,6 +48,11 @@
<artifactId>hsqldb</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>8.1-407.jdbc3</version>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -419,6 +419,13 @@ class MappedQueryResultObjectProvider
return _res.getSQLObject(obj, map);
}
protected Object getStreamInternal(JDBCStore store, Object obj,
int metaTypeCode, Object arg, Joins joins) throws SQLException {
if (obj instanceof Column)
return _res.getObject((Column) obj, arg, joins);
return _res.getObject(obj, metaTypeCode, arg);
}
protected Ref getRefInternal(Object obj, Map map, Joins joins)
throws SQLException {
if (obj instanceof Column)

View File

@ -28,11 +28,13 @@ import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.meta.FieldMapping;
import org.apache.openjpa.jdbc.meta.ValueMappingInfo;
import org.apache.openjpa.jdbc.schema.Column;
import org.apache.openjpa.jdbc.sql.PostgresDictionary;
import org.apache.openjpa.jdbc.sql.Result;
import org.apache.openjpa.jdbc.sql.Row;
import org.apache.openjpa.jdbc.sql.RowManager;
import org.apache.openjpa.jdbc.sql.Select;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.meta.JavaTypes;
/**
* Direct mapping from a stream value to a column.
@ -43,6 +45,7 @@ import org.apache.openjpa.kernel.OpenJPAStateManager;
public class LobFieldStrategy extends AbstractFieldStrategy {
private int fieldType;
private boolean isBlob;
public void map(boolean adapt) {
assertNotMappedBy();
@ -57,8 +60,9 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
vinfo.assertNoForeignKey(field, !adapt);
Column tmpCol = new Column();
tmpCol.setName(field.getName());
tmpCol.setJavaType(field.getTypeCode());
tmpCol.setType(fieldType);
tmpCol.setJavaType(field.getTypeCode());
tmpCol.setSize(-1);
Column[] cols = vinfo.getColumns(field, field.getName(),
@ -74,15 +78,22 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
return null;
}
public void delete(OpenJPAStateManager sm, JDBCStore store, RowManager rm)
throws SQLException {
Select sel = createSelect(sm, store);
store.getDBDictionary().deleteStream(store, sel);
}
public void insert(OpenJPAStateManager sm, JDBCStore store, RowManager rm)
throws SQLException {
Object ob = toDataStoreValue(sm.fetchObjectField
(field.getIndex()), store);
Row row = field.getRow(sm, store, rm, Row.ACTION_INSERT);
if (field.getColumnIO().isInsertable(0, ob == null)) {
if (isBlob()) {
Select sel = createSelect(sm, store);
if (isBlob) {
store.getDBDictionary().insertBlobForStreamingLoad
(row, field.getColumns()[0], ob);
(row, field.getColumns()[0], store, ob, sel);
} else {
store.getDBDictionary().insertClobForStreamingLoad
(row, field.getColumns()[0], ob);
@ -97,7 +108,7 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
if (field.getColumnIO().isInsertable(0, ob == null)) {
if (ob != null) {
Select sel = createSelect(sm, store);
if (isBlob()) {
if (isBlob) {
store.getDBDictionary().updateBlob
(sel, store, (InputStream)ob);
} else {
@ -118,12 +129,13 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
(field.getIndex()), store);
if (field.getColumnIO().isUpdatable(0, ob == null)) {
Row row = field.getRow(sm, store, rm, Row.ACTION_UPDATE);
if (isBlob()) {
Select sel = createSelect(sm, store);
if (isBlob) {
store.getDBDictionary().insertBlobForStreamingLoad
(row, field.getColumns()[0], ob);
(row, field.getColumns()[0], store, ob, sel);
} else {
store.getDBDictionary().insertClobForStreamingLoad
(row, field.getColumns()[0], ob);
(row, field.getColumns()[0], sel);
}
}
}
@ -135,7 +147,7 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
if (field.getColumnIO().isUpdatable(0, ob == null)) {
if (ob != null) {
Select sel = createSelect(sm, store);
if (isBlob()) {
if (isBlob) {
store.getDBDictionary().updateBlob
(sel, store, (InputStream)ob);
} else {
@ -163,8 +175,8 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
JDBCFetchConfiguration fetch, Result res) throws SQLException {
Column col = field.getColumns()[0];
if (res.contains(col)) {
if (isBlob()) {
sm.storeObject(field.getIndex(), res.getBinaryStream(col));
if (isBlob) {
sm.storeObject(field.getIndex(), res.getLOBStream(store, col));
} else {
sm.storeObject(field.getIndex(), res.getCharacterStream(col));
}
@ -177,18 +189,21 @@ public class LobFieldStrategy extends AbstractFieldStrategy {
}
public void setFieldMapping(FieldMapping owner) {
field = owner;
if (owner.getElementMapping().getMappingRepository().getDBDictionary()
instanceof PostgresDictionary) {
fieldType = Types.INTEGER;
isBlob = true;
field.setTypeCode(JavaTypes.INT);
} else {
if (owner.getType().isAssignableFrom(InputStream.class)) {
isBlob = true;
fieldType = Types.BLOB;
} else if (owner.getType().isAssignableFrom(Reader.class)) {
isBlob = false;
fieldType = Types.CLOB;
}
field = owner;
}
private boolean isBlob() {
if (fieldType == Types.BLOB)
return true;
return false;
}
private Select createSelect(OpenJPAStateManager sm, JDBCStore store) {

View File

@ -49,6 +49,7 @@ import org.apache.openjpa.jdbc.schema.Table;
import org.apache.openjpa.lib.util.Closeable;
import org.apache.openjpa.meta.JavaTypes;
import org.apache.openjpa.util.UnsupportedException;
import serp.util.Strings;
/**
@ -341,12 +342,23 @@ public abstract class AbstractResult
return getBinaryStreamInternal(translate(col, joins), joins);
}
public InputStream getLOBStream(JDBCStore store, Object obj)
throws SQLException {
return getLOBStreamInternal(store, translate(obj, null), null);
}
protected InputStream getBinaryStreamInternal(Object obj, Joins joins)
throws SQLException {
return (InputStream) checkNull(getObjectInternal(obj,
JavaSQLTypes.BINARY_STREAM, null, joins));
}
protected InputStream getLOBStreamInternal(JDBCStore store, Object obj,
Joins joins) throws SQLException {
return (InputStream) checkNull(getStreamInternal(store, obj,
JavaSQLTypes.BINARY_STREAM, null, joins));
}
public Blob getBlob(Object obj)
throws SQLException {
return getBlobInternal(translate(obj, null), null);
@ -670,6 +682,9 @@ public abstract class AbstractResult
Object arg, Joins joins)
throws SQLException;
protected abstract Object getStreamInternal(JDBCStore store, Object obj,
int metaType, Object arg, Joins joins) throws SQLException;
public Object getSQLObject(Object obj, Map map)
throws SQLException {
return getSQLObjectInternal(translate(obj, null), map, null);

View File

@ -60,6 +60,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
@ -101,10 +102,12 @@ import org.apache.openjpa.util.GeneralException;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.InvalidStateException;
import org.apache.openjpa.util.OpenJPAException;
import org.apache.openjpa.util.ReferentialIntegrityException;
import org.apache.openjpa.util.Serialization;
import org.apache.openjpa.util.StoreException;
import org.apache.openjpa.util.UnsupportedException;
import org.apache.openjpa.util.UserException;
import serp.util.Numbers;
import serp.util.Strings;
@ -483,6 +486,11 @@ public class DBDictionary
return rs.getBinaryStream(column);
}
public InputStream getLOBStream(JDBCStore store, ResultSet rs,
int column) throws SQLException {
return rs.getBinaryStream(column);
}
/**
* Convert the specified column of the SQL ResultSet to the proper
* java type.
@ -4166,8 +4174,8 @@ public class DBDictionary
return column.toString();
}
public void insertBlobForStreamingLoad(Row row, Column col, Object ob)
throws SQLException {
public void insertBlobForStreamingLoad(Row row, Column col,
JDBCStore store, Object ob, Select sel) throws SQLException {
if (ob != null) {
row.setBinaryStream(col,
new ByteArrayInputStream(new byte[0]), 0);
@ -4415,4 +4423,8 @@ public class DBDictionary
}
return false;
}
public void deleteStream(JDBCStore store, Select sel) throws SQLException {
//Do nothing
}
}

View File

@ -252,6 +252,11 @@ public class MergedResult
return _res[_idx].getBinaryStream(obj);
}
public InputStream getLOBStream(JDBCStore store, Object obj)
throws SQLException {
return _res[_idx].getLOBStream(store, obj);
}
public Blob getBlob(Object obj)
throws SQLException {
return _res[_idx].getBlob(obj);

View File

@ -18,6 +18,9 @@
*/
package org.apache.openjpa.jdbc.sql;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -29,6 +32,7 @@ import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.exps.FilterValue;
import org.apache.openjpa.jdbc.schema.Column;
import org.apache.openjpa.jdbc.schema.Sequence;
@ -36,6 +40,11 @@ import org.apache.openjpa.jdbc.schema.Table;
import org.apache.openjpa.lib.jdbc.DelegatingConnection;
import org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.StoreException;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
/**
* Dictionary for Postgres.
@ -319,6 +328,151 @@ public class PostgresDictionary
return new PostgresConnection(super.decorate(conn), this);
}
public InputStream getLOBStream(JDBCStore store, ResultSet rs,
int column) throws SQLException {
DelegatingConnection conn = (DelegatingConnection)store
.getConnection();
conn.setAutoCommit(false);
LargeObjectManager lom = ((PGConnection)conn.getInnermostDelegate())
.getLargeObjectAPI();
if (rs.getInt(column) != -1) {
LargeObject lo = lom.open(rs.getInt(column));
return lo.getInputStream();
} else {
return null;
}
}
public void insertBlobForStreamingLoad(Row row, Column col,
JDBCStore store, Object ob, Select sel) throws SQLException {
if (row.getAction() == Row.ACTION_INSERT) {
insertPostgresBlob(row, col, store, ob);
} else if (row.getAction() == Row.ACTION_UPDATE) {
updatePostgresBlob(row, col, store, ob, sel);
}
}
private void insertPostgresBlob(Row row, Column col, JDBCStore store,
Object ob) throws SQLException {
if (ob != null) {
col.setType(Types.INTEGER);
DelegatingConnection conn = (DelegatingConnection)store
.getConnection();
try {
conn.setAutoCommit(false);
PGConnection pgconn = (PGConnection) conn.getInnermostDelegate();
LargeObjectManager lom = pgconn.getLargeObjectAPI();
// The create method is valid in versions previous 8.3
// in 8.3 this methos is deprecated, use createLO
int oid = lom.create();
LargeObject lo = lom.open(oid, LargeObjectManager.WRITE);
OutputStream os = lo.getOutputStream();
copy((InputStream)ob, os);
lo.close();
row.setInt(col, oid);
} catch (IOException ioe) {
throw new StoreException(ioe);
} finally {
conn.close();
}
} else {
row.setInt(col, -1);
}
}
private void updatePostgresBlob(Row row, Column col, JDBCStore store,
Object ob, Select sel) throws SQLException {
SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration());
ResultSet res = null;
DelegatingConnection conn =
(DelegatingConnection) store.getConnection();
PreparedStatement stmnt = null;
try {
stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(),
ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
res = stmnt.executeQuery();
if (!res.next()) {
throw new InternalException(_loc.get("stream-exception"));
}
int oid = res.getInt(1);
if (oid != -1) {
conn.setAutoCommit(false);
PGConnection pgconn = (PGConnection)conn
.getInnermostDelegate();
LargeObjectManager lom = pgconn.getLargeObjectAPI();
if (ob != null) {
LargeObject lo = lom.open(oid, LargeObjectManager.WRITE);
OutputStream os = lo.getOutputStream();
copy((InputStream)ob, os);
lo.close();
} else {
lom.delete(oid);
row.setInt(col, -1);
}
} else {
if (ob != null) {
conn.setAutoCommit(false);
PGConnection pgconn = (PGConnection)conn
.getInnermostDelegate();
LargeObjectManager lom = pgconn.getLargeObjectAPI();
oid = lom.create();
LargeObject lo = lom.open(oid, LargeObjectManager.WRITE);
OutputStream os = lo.getOutputStream();
copy((InputStream)ob, os);
lo.close();
row.setInt(col, oid);
}
}
} catch (IOException ioe) {
throw new StoreException(ioe);
} finally {
if (res != null)
try { res.close (); } catch (SQLException e) {}
if (stmnt != null)
try { stmnt.close (); } catch (SQLException e) {}
if (conn != null)
try { conn.close (); } catch (SQLException e) {}
}
}
public void updateBlob(Select sel, JDBCStore store, InputStream is)
throws SQLException {
//Do nothing
}
public void deleteStream(JDBCStore store, Select sel) throws SQLException {
SQLBuffer sql = sel.toSelect(true, store.getFetchConfiguration());
ResultSet res = null;
DelegatingConnection conn =
(DelegatingConnection) store.getConnection();
PreparedStatement stmnt = null;
try {
stmnt = sql.prepareStatement(conn, store.getFetchConfiguration(),
ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
res = stmnt.executeQuery();
if (!res.next()) {
throw new InternalException(_loc.get("stream-exception"));
}
int oid = res.getInt(1);
if (oid != -1) {
conn.setAutoCommit(false);
PGConnection pgconn = (PGConnection)conn
.getInnermostDelegate();
LargeObjectManager lom = pgconn.getLargeObjectAPI();
lom.delete(oid);
}
} finally {
if (res != null)
try { res.close (); } catch (SQLException e) {}
if (stmnt != null)
try { stmnt.close (); } catch (SQLException e) {}
if (conn != null)
try { conn.close (); } catch (SQLException e) {}
}
}
/**
* Connection wrapper to work around the postgres empty result set bug.
*/

View File

@ -213,6 +213,8 @@ public interface Result
public InputStream getBinaryStream(Object obj)
throws SQLException;
public InputStream getLOBStream(JDBCStore store, Object obj)
throws SQLException;
/**
* Return the value stored in the given column or id; may not be supported
* by results that are not backed by a SQL result set.

View File

@ -42,6 +42,7 @@ import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.meta.JavaSQLTypes;
import org.apache.openjpa.jdbc.schema.Column;
import org.apache.openjpa.meta.JavaTypes;
import serp.util.Numbers;
/**
@ -345,6 +346,11 @@ public class ResultSetResult
return _dict.getLong(_rs, ((Number) obj).intValue());
}
protected Object getStreamInternal(JDBCStore store, Object obj,
int metaTypeCode, Object arg, Joins joins) throws SQLException {
return getLOBStreamInternal(store, obj, joins);
}
protected Object getObjectInternal(Object obj, int metaTypeCode,
Object arg, Joins joins)
throws SQLException {
@ -498,4 +504,9 @@ public class ResultSetResult
return 0;
}
}
protected InputStream getLOBStreamInternal(JDBCStore store, Object obj,
Joins joins) throws SQLException {
return _dict.getLOBStream(store, _rs, ((Number) obj).intValue());
}
}

View File

@ -30,6 +30,7 @@ import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.sql.DBDictionary;
import org.apache.openjpa.jdbc.sql.MySQLDictionary;
import org.apache.openjpa.jdbc.sql.OracleDictionary;
import org.apache.openjpa.jdbc.sql.PostgresDictionary;
import org.apache.openjpa.jdbc.sql.SQLServerDictionary;
import org.apache.openjpa.meta.ClassMetaData;
import org.apache.openjpa.persistence.JPAFacadeHelper;
@ -56,7 +57,8 @@ public abstract class AbstractLobTest extends SingleEMFTestCase {
.getDBDictionaryInstance();
if (dict instanceof MySQLDictionary ||
dict instanceof SQLServerDictionary ||
dict instanceof OracleDictionary) {
dict instanceof OracleDictionary ||
dict instanceof PostgresDictionary) {
return true;
}
return false;
@ -81,6 +83,7 @@ public abstract class AbstractLobTest extends SingleEMFTestCase {
insert(newLobEntity(s, 1));
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
Query query = em.createQuery(getSelectQuery());
LobEntity entity = (LobEntity) query.getSingleResult();
assertNotNull(entity.getStream());

View File

@ -2400,6 +2400,38 @@ sequence value. May use a placeholder of <literal>{0}</literal> for the variable
sequence name. Defaults to a database-appropriate value.
</para>
</listitem>
<listitem id="DBDictionary.BlobBufferSize">
<para>
<indexterm>
<primary>
BLOB
</primary>
<secondary>
BlobBufferSize
</secondary>
</indexterm>
<literal>BlobBufferSize</literal>: This property establishes the buffer size in
the <literal>INSERT/UPDATE</literal> operations with an
<literal>java.io.InputStream</literal>This is only used with OpenJPA's
<xref linkend="ref_guide_streamsupport"/>. Defaults to 50000.
</para>
</listitem>
<listitem id="DBDictionary.ClobBufferSize">
<para>
<indexterm>
<primary>
CLOB
</primary>
<secondary>
ClobBufferSize
</secondary>
</indexterm>
<literal>ClobBufferSize</literal>: This property establish the buffer size in
the <literal>INSERT/UPDATE</literal> operations with a
<literal>java.io.Reader</literal>This is only used with OpenJPA's
<xref linkend="ref_guide_streamsupport"/>. Defaults to 50000.
</para>
</listitem>
</itemizedlist>
</section>
<section id="ref_guide_dbsetup_dbsupport_mysql">

View File

@ -2677,6 +2677,47 @@ em.persist(o1);
. select o.shipAddress.city from Order o (INVALID)
. select o from Order o where o.shipAddress.street = &quot;San Jose&quot; (INVALID multi valued)
</programlisting>
</example>
</section>
<section id="ref_guide_streamsupport">
<title>
Stream LOB Support
</title>
<indexterm zone="ref_guide_streamsupport">
<primary>
stream support
</primary>
</indexterm>
<indexterm zone="ref_guide_streamsupport">
<primary>
stream lob support
</primary>
</indexterm>
<para>
Since the 1.1.0 release Apache OpenJPA added support for Streams. This feature
makes it possible to stream large amounts of data into and out of fields in
objects managed by OpenJPA without ever holding all the data in memory at the
same time.
</para>
<para>
To persist a stream, use the
<ulink url="../javadoc/org/apache/openjpa/persistence/Persistent.html">
<classname>org.apache.openjpa.persistence.Persistent</classname></ulink>
annotation.
</para>
<example id="ref_guide_streamsupport_example">
<title>
Showing annotated InputStream
</title>
<programlisting>
@Entity
public class Employee {
...
@Persistent
private InputStream photoStream;
...
}
</programlisting>
</example>
</section>