SOLR-833 -- A DataSource to read data from a field as a reader. This can be used, for example, to read XMLs residing as CLOBs or BLOBs in databases.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@713343 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-11-12 10:29:49 +00:00
parent d501b33bc8
commit dff17dc8a7
7 changed files with 232 additions and 29 deletions

View File

@ -28,6 +28,10 @@ New Features
3. SOLR-842: Better error handling in DataImportHandler with options to abort, skip and continue imports. 3. SOLR-842: Better error handling in DataImportHandler with options to abort, skip and continue imports.
(Noble Paul, shalin) (Noble Paul, shalin)
4. SOLR-833: A DataSource to read data from a field as a reader. This can be used, for example, to read XMLs
residing as CLOBs or BLOBs in databases.
(Noble Paul via shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -64,7 +64,7 @@ public class DataImporter {
private Properties store = new Properties(); private Properties store = new Properties();
private Map<String, Properties> dataSourceProps; private Map<String, Properties> dataSourceProps = new HashMap<String, Properties>();
private IndexSchema schema; private IndexSchema schema;
@ -159,6 +159,15 @@ public class DataImporter {
} }
/**Used by tests
*/
void loadAndInit(String configStr){
loadDataConfig(configStr);
Map<String, DataConfig.Field> fields = new HashMap<String, DataConfig.Field>();
DataConfig.Entity e = getConfig().documents.get(0).entities.get(0);
initEntity(e, fields, false);
}
void loadDataConfig(String configFile) { void loadDataConfig(String configFile) {
try { try {
@ -193,27 +202,29 @@ public class DataImporter {
if (e.fields != null) { if (e.fields != null) {
for (DataConfig.Field f : e.fields) { for (DataConfig.Field f : e.fields) {
f.nameOrColName = f.getName(); f.nameOrColName = f.getName();
SchemaField schemaField = schema.getFieldOrNull(f.getName()); if (schema != null) {
if (schemaField != null) { SchemaField schemaField = schema.getFieldOrNull(f.getName());
f.multiValued = schemaField.multiValued(); if (schemaField != null) {
f.allAttributes.put(MULTI_VALUED, Boolean.toString(schemaField f.multiValued = schemaField.multiValued();
.multiValued())); f.allAttributes.put(MULTI_VALUED, Boolean.toString(schemaField
f.allAttributes.put(TYPE, schemaField.getType().getTypeName()); .multiValued()));
f.allAttributes.put("indexed", Boolean f.allAttributes.put(TYPE, schemaField.getType().getTypeName());
.toString(schemaField.indexed())); f.allAttributes.put("indexed", Boolean
f.allAttributes.put("stored", Boolean.toString(schemaField.stored())); .toString(schemaField.indexed()));
f.allAttributes.put("defaultValue", schemaField.getDefaultValue()); f.allAttributes.put("stored", Boolean.toString(schemaField.stored()));
} else { f.allAttributes.put("defaultValue", schemaField.getDefaultValue());
} else {
try { try {
f.allAttributes.put(TYPE, schema.getDynamicFieldType(f.getName()) f.allAttributes.put(TYPE, schema.getDynamicFieldType(f.getName())
.getTypeName()); .getTypeName());
f.allAttributes.put(MULTI_VALUED, "true"); f.allAttributes.put(MULTI_VALUED, "true");
f.multiValued = true; f.multiValued = true;
} catch (RuntimeException e2) { } catch (RuntimeException e2) {
LOG.info("Field in data-config.xml - " + f.getName() LOG.info("Field in data-config.xml - " + f.getName()
+ " not found in schema.xml"); + " not found in schema.xml");
f.toWrite = false; f.toWrite = false;
}
} }
} }
fields.put(f.getName(), f); fields.put(f.getName(), f);

View File

@ -370,7 +370,7 @@ public class DocBuilder {
continue; continue;
} }
DataConfig.Field field = entity.colNameVsField.get(key); DataConfig.Field field = entity.colNameVsField.get(key);
if (field == null) { if (field == null && dataImporter.getSchema() != null) {
// This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field) // This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
SchemaField sf = dataImporter.getSchema().getFieldOrNull(key); SchemaField sf = dataImporter.getSchema().getFieldOrNull(key);
if (sf == null) { if (sf == null) {
@ -381,7 +381,7 @@ public class DocBuilder {
} }
//else do nothing. if we add it it may fail //else do nothing. if we add it it may fail
} else { } else {
if (field.toWrite) { if (field != null && field.toWrite) {
addFieldToDoc(entry.getValue(), key, field.boost, field.multiValued, doc); addFieldToDoc(entry.getValue(), key, field.boost, field.multiValued, doc);
} }
} }

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.sql.Blob;
import java.sql.Clob;
import java.util.Properties;
/**
* This can be useful for users who has a DB field containing xml and wish to use a nested XPathEntityProcessor
* <p/>
* The datasouce may be configured as follows
* <p/>
* <datasource name="f1" type="FieldReaderDataSource" />
* <p/>
* The enity which uses this datasource must keep the url value as the varaible name url="field-name"
* <p/>
* The fieldname must be resolvable from VariableResolver
* <p/>
* This may be used with any EntityProcessor which uses a DataSource<Reader> eg:XPathEntityProcessor
* <p/>
* Supports String, BLOB, CLOB data types and there is an extra field (in the entity) 'encoding' for BLOB types
*
* @version $Id$
* @since 1.4
*/
public class FieldReaderDataSource extends DataSource<Reader> {
private static final Logger LOG = LoggerFactory.getLogger(FieldReaderDataSource.class);
protected VariableResolver vr;
protected String dataField;
private String encoding;
public void init(Context context, Properties initProps) {
vr = context.getVariableResolver();
dataField = context.getEntityAttribute("dataField");
encoding = context.getEntityAttribute("encoding");
/*no op*/
}
public Reader getData(String query) {
Object o = vr.resolve(dataField);
if (o == null) return null;
if (o instanceof String) {
return new StringReader((String) o);
} else if (o instanceof Clob) {
Clob clob = (Clob) o;
try {
//Most of the JDBC drivers have getCharacterStream defined as public
// so let us just check it
Method m = clob.getClass().getDeclaredMethod("getCharacterStream");
if (Modifier.isPublic(m.getModifiers())) {
return (Reader) m.invoke(clob);
} else {
// force invoke
m.setAccessible(true);
return (Reader) m.invoke(clob);
}
} catch (Exception e) {
LOG.info("Unable to get data from CLOB");
return null;
}
} else if (o instanceof Blob) {
Blob blob = (Blob) o;
try {
//Most of the JDBC drivers have getBinaryStream defined as public
// so let us just check it
Method m = blob.getClass().getDeclaredMethod("getBinaryStream");
if (Modifier.isPublic(m.getModifiers())) {
return getReader(m, blob);
} else {
// force invoke
m.setAccessible(true);
return getReader(m, blob);
}
} catch (Exception e) {
LOG.info("Unable to get data from BLOB");
return null;
}
} else {
return new StringReader(o.toString());
}
}
private Reader getReader(Method m, Blob blob)
throws IllegalAccessException, InvocationTargetException, UnsupportedEncodingException {
InputStream is = (InputStream) m.invoke(blob);
if (encoding == null) {
return (new InputStreamReader(is));
} else {
return (new InputStreamReader(is, encoding));
}
}
public void close() {
}
}

View File

@ -67,6 +67,8 @@ public class TemplateString {
* @return the string with all variables replaced * @return the string with all variables replaced
*/ */
public String replaceTokens(String string, VariableResolver resolver) { public String replaceTokens(String string, VariableResolver resolver) {
if (string == null)
return null;
TemplateString ts = cache.get(string); TemplateString ts = cache.get(string);
if (ts == null) { if (ts == null) {
ts = new TemplateString(string); ts = new TemplateString(string);

View File

@ -37,8 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* <p/> An implementation of EntityProcessor which uses a streaming xpath parser to extract values out of XML documents. * <p> An implementation of EntityProcessor which uses a streaming xpath parser to extract values out of XML documents.
* It is typically used in conjunction with HttpDataSource or FileDataSource. </p> <p/> <p/> Refer to <a * It is typically used in conjunction with HttpDataSource or FileDataSource. </p> <p/> <p> Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
* details. </p> * details. </p>
* <p/> * <p/>
@ -131,9 +131,8 @@ public class XPathEntityProcessor extends EntityProcessorBase {
"Exception while reading xpaths for fields", e); "Exception while reading xpaths for fields", e);
} }
} }
String url = context.getEntityAttribute(URL);
List<String> l = TemplateString.getVariables(context List<String> l = url == null ? Collections.EMPTY_LIST : TemplateString.getVariables(url);
.getEntityAttribute(URL));
for (String s : l) { for (String s : l) {
if (s.startsWith(entityName + ".")) { if (s.startsWith(entityName + ".")) {
if (placeHolderVariables == null) if (placeHolderVariables == null)
@ -166,7 +165,6 @@ public class XPathEntityProcessor extends EntityProcessorBase {
if (pk == null || result.get(pk) != null) if (pk == null || result.get(pk) != null)
return result; return result;
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import junit.framework.Assert;
import static org.apache.solr.handler.dataimport.AbstractDataImportHandlerTest.createMap;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test for FieldReaderDataSource
*
* @version $Id$
* @see org.apache.solr.handler.dataimport.FieldReaderDataSource
* @since 1.4
*/
public class TestFieldReader {
@Test
public void simple() {
DataImporter di = new DataImporter();
di.loadAndInit(config);
TestDocBuilder.SolrWriterImpl sw = new TestDocBuilder.SolrWriterImpl();
DataImporter.RequestParams rp = new DataImporter.RequestParams(createMap("command", "full-import"));
List<Map<String, Object>> l = new ArrayList<Map<String, Object>>();
l.add(createMap("xml", xml));
MockDataSource.setIterator("select * from a", l.iterator());
di.runCmd(rp, sw, new HashMap<String, String>());
Assert.assertEquals(sw.docs.get(0).getFieldValue("y"), "Hello");
MockDataSource.clearCache();
}
String config = "<dataConfig>\n" +
" <dataSource type=\"FieldReaderDataSource\" name=\"f\"/>\n" +
" <dataSource type=\"MockDataSource\"/>\n" +
" <document>\n" +
" <entity name=\"a\" query=\"select * from a\" >\n" +
" <entity name=\"b\" dataSource=\"f\" processor=\"XPathEntityProcessor\" forEach=\"/x\" dataField=\"a.xml\">\n" +
" <field column=\"y\" xpath=\"/x/y\"/>\n" +
" </entity>\n" +
" </entity>\n" +
" </document>\n" +
"</dataConfig>";
String xml = "<x>\n" +
" <y>Hello</y>\n" +
"</x>";
}