SOLR-1191: fix DIH deltaQyery when pk has prefix, change NPE to better error reporting

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1071435 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2011-02-16 23:01:41 +00:00
parent 25177837b0
commit 4dd86bf8ab
3 changed files with 194 additions and 3 deletions

View File

@ -679,6 +679,13 @@ Bug Fixes
DocumentAnalysisRequestHandler to respect charset from XML file and only DocumentAnalysisRequestHandler to respect charset from XML file and only
use HTTP header's "Content-Type" as a "hint". (Uwe Schindler) use HTTP header's "Content-Type" as a "hint". (Uwe Schindler)
* SOLR-1191: resolve DataImportHandler deltaQuery column against pk when pk
has a prefix (e.g. pk="book.id" deltaQuery="select id from ..."). More
useful error reporting when no match found (previously failed with a
NullPointerException in log and no clear user feedback). (gthb via yonik)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -318,7 +318,11 @@ public class DocBuilder {
String keyName = root.isDocRoot ? root.getPk() : root.getSchemaPk(); String keyName = root.isDocRoot ? root.getPk() : root.getSchemaPk();
Object key = map.get(keyName); Object key = map.get(keyName);
if(key == null) { if(key == null) {
LOG.warn("no key was available for deleteted pk query. keyName = " + keyName); keyName = findMatchingPkColumn(keyName, map);
key = map.get(keyName);
}
if(key == null) {
LOG.warn("no key was available for deleted pk query. keyName = " + keyName);
continue; continue;
} }
writer.deleteDoc(key); writer.deleteDoc(key);
@ -816,6 +820,28 @@ public class DocBuilder {
return entity.processor = new EntityProcessorWrapper(entityProcessor, this); return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
} }
private String findMatchingPkColumn(String pk, Map<String, Object> row) {
if (row.containsKey(pk))
throw new IllegalArgumentException(
String.format("deltaQuery returned a row with null for primary key %s", pk));
String resolvedPk = null;
for (String columnName : row.keySet()) {
if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) {
if (resolvedPk != null)
throw new IllegalArgumentException(
String.format(
"deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'",
resolvedPk, columnName, pk));
resolvedPk = columnName;
}
}
if (resolvedPk == null)
throw new IllegalArgumentException(
String.format("deltaQuery has no column to resolve to declared primary key pk='%s'", pk));
LOG.info(String.format("Resolving deltaQuery column '%s' to match entity's declared pk '%s'", resolvedPk, pk));
return resolvedPk;
}
/** /**
* <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last * <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last
* indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level * indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level
@ -852,13 +878,20 @@ public class DocBuilder {
Map<String, Map<String, Object>> deltaSet = new HashMap<String, Map<String, Object>>(); Map<String, Map<String, Object>> deltaSet = new HashMap<String, Map<String, Object>>();
LOG.info("Running ModifiedRowKey() for Entity: " + entity.name); LOG.info("Running ModifiedRowKey() for Entity: " + entity.name);
//get the modified rows in this entity //get the modified rows in this entity
String pk = entity.getPk();
while (true) { while (true) {
Map<String, Object> row = entityProcessor.nextModifiedRowKey(); Map<String, Object> row = entityProcessor.nextModifiedRowKey();
if (row == null) if (row == null)
break; break;
deltaSet.put(row.get(entity.getPk()).toString(), row); Object pkValue = row.get(pk);
if (pkValue == null) {
pk = findMatchingPkColumn(pk, row);
pkValue = row.get(pk);
}
deltaSet.put(pkValue.toString(), row);
importStatistics.rowsCount.incrementAndGet(); importStatistics.rowsCount.incrementAndGet();
// check for abort // check for abort
if (stop.get()) if (stop.get())
@ -873,8 +906,14 @@ public class DocBuilder {
deletedSet.add(row); deletedSet.add(row);
Object pkValue = row.get(pk);
if (pkValue == null) {
pk = findMatchingPkColumn(pk, row);
pkValue = row.get(pk);
}
// Remove deleted rows from the delta rows // Remove deleted rows from the delta rows
String deletedRowPk = row.get(entity.getPk()).toString(); String deletedRowPk = pkValue.toString();
if (deltaSet.containsKey(deletedRowPk)) { if (deltaSet.containsKey(deletedRowPk)) {
deltaSet.remove(deletedRowPk); deltaSet.remove(deletedRowPk);
} }

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.*;
/**
* <p>
* Test for SqlEntityProcessorDelta verifying fix for SOLR-1191
* </p>
*
*
* @version $Id$
* @since solr 3.1
*/
public class TestSqlEntityProcessorDeltaPrefixedPk extends AbstractDataImportHandlerTestCase {
private static final String FULLIMPORT_QUERY = "select * from x";
private static final String DELTA_QUERY = "select id from x where last_modified > NOW";
private static final String DELETED_PK_QUERY = "select id from x where last_modified > NOW AND deleted='true'";
private static final String dataConfig_delta =
"<dataConfig>" +
" <dataSource type=\"MockDataSource\"/>\n" +
" <document>\n" +
" <entity name=\"x\" transformer=\"TemplateTransformer\" pk=\"x.id\"" +
" query=\"" + FULLIMPORT_QUERY + "\"" +
" deletedPkQuery=\"" + DELETED_PK_QUERY + "\"" +
" deltaImportQuery=\"select * from x where id='${dih.delta.id}'\"" +
" deltaQuery=\"" + DELTA_QUERY + "\">\n" +
" <field column=\"id\" name=\"id\"/>\n" +
" <field column=\"desc\" name=\"desc\"/>\n" +
" </entity>\n" +
" </document>\n" +
"</dataConfig>\n";
private static final List EMPTY_LIST = Collections.EMPTY_LIST;
@BeforeClass
public static void beforeClass() throws Exception {
initCore("dataimport-solrconfig.xml", "dataimport-schema.xml");
}
@Before @Override
public void setUp() throws Exception {
super.setUp();
clearIndex();
assertU(commit());
//Logger.getLogger("").setLevel(Level.ALL);
}
@SuppressWarnings("unchecked")
private void add1document() throws Exception {
List row = new ArrayList();
row.add(createMap("id", "1", "desc", "bar"));
MockDataSource.setIterator(FULLIMPORT_QUERY, row.iterator());
runFullImport(dataConfig_delta);
assertQ(req("*:* OR add1document"), "//*[@numFound='1']");
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("desc:bar"), "//*[@numFound='1']");
}
@Test
@SuppressWarnings("unchecked")
public void testDeltaImport_deleteResolvesUnprefixedPk() throws Exception {
add1document();
MockDataSource.clearCache();
List deletedRows = new ArrayList();
deletedRows.add(createMap("id", "1"));
MockDataSource.setIterator(DELETED_PK_QUERY, deletedRows.iterator());
MockDataSource.setIterator(DELTA_QUERY, EMPTY_LIST.iterator());
runDeltaImport(dataConfig_delta);
assertQ(req("*:* OR testDeltaImport_deleteResolvesUnprefixedPk"), "//*[@numFound='0']");
}
@Test
@SuppressWarnings("unchecked")
public void testDeltaImport_replace_resolvesUnprefixedPk() throws Exception {
add1document();
MockDataSource.clearCache();
List deltaRows = new ArrayList();
deltaRows.add(createMap("id", "1"));
MockDataSource.setIterator(DELTA_QUERY, deltaRows.iterator());
MockDataSource.setIterator(DELETED_PK_QUERY, EMPTY_LIST.iterator());
List rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "baz"));
MockDataSource.setIterator("select * from x where id='1'", rows.iterator());
runDeltaImport(dataConfig_delta);
assertQ(req("*:* OR testDeltaImport_replace_resolvesUnprefixedPk"), "//*[@numFound='1']");
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("desc:bar"), "//*[@numFound='0']");
assertQ(req("desc:baz"), "//*[@numFound='1']");
}
@Test
@SuppressWarnings("unchecked")
public void testDeltaImport_addResolvesUnprefixedPk() throws Exception {
add1document();
MockDataSource.clearCache();
List deltaRows = new ArrayList();
deltaRows.add(createMap("id", "2"));
MockDataSource.setIterator(DELTA_QUERY, deltaRows.iterator());
List rows = new ArrayList();
rows.add(createMap("id", "2", "desc", "xyzzy"));
MockDataSource.setIterator("select * from x where id='2'", rows.iterator());
runDeltaImport(dataConfig_delta);
assertQ(req("*:* OR testDeltaImport_addResolvesUnprefixedPk"), "//*[@numFound='2']");
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("id:2"), "//*[@numFound='1']");
assertQ(req("desc:bar"), "//*[@numFound='1']");
assertQ(req("desc:xyzzy"), "//*[@numFound='1']");
}
}