From 4dd86bf8ab0b450b63be170836ecb4cca7ed34b1 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Wed, 16 Feb 2011 23:01:41 +0000 Subject: [PATCH] SOLR-1191: fix DIH deltaQyery when pk has prefix, change NPE to better error reporting git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1071435 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 7 + .../solr/handler/dataimport/DocBuilder.java | 45 +++++- ...TestSqlEntityProcessorDeltaPrefixedPk.java | 145 ++++++++++++++++++ 3 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 solr/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c3aebc03ddd..455fd961047 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -679,6 +679,13 @@ Bug Fixes DocumentAnalysisRequestHandler to respect charset from XML file and only use HTTP header's "Content-Type" as a "hint". (Uwe Schindler) +* SOLR-1191: resolve DataImportHandler deltaQuery column against pk when pk + has a prefix (e.g. pk="book.id" deltaQuery="select id from ..."). More + useful error reporting when no match found (previously failed with a + NullPointerException in log and no clear user feedback). (gthb via yonik) + + + Other Changes ---------------------- diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java index 42bf6da9499..858d688976e 100644 --- a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java +++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java @@ -318,7 +318,11 @@ public class DocBuilder { String keyName = root.isDocRoot ? root.getPk() : root.getSchemaPk(); Object key = map.get(keyName); if(key == null) { - LOG.warn("no key was available for deleteted pk query. keyName = " + keyName); + keyName = findMatchingPkColumn(keyName, map); + key = map.get(keyName); + } + if(key == null) { + LOG.warn("no key was available for deleted pk query. keyName = " + keyName); continue; } writer.deleteDoc(key); @@ -816,6 +820,28 @@ public class DocBuilder { return entity.processor = new EntityProcessorWrapper(entityProcessor, this); } + private String findMatchingPkColumn(String pk, Map row) { + if (row.containsKey(pk)) + throw new IllegalArgumentException( + String.format("deltaQuery returned a row with null for primary key %s", pk)); + String resolvedPk = null; + for (String columnName : row.keySet()) { + if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) { + if (resolvedPk != null) + throw new IllegalArgumentException( + String.format( + "deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'", + resolvedPk, columnName, pk)); + resolvedPk = columnName; + } + } + if (resolvedPk == null) + throw new IllegalArgumentException( + String.format("deltaQuery has no column to resolve to declared primary key pk='%s'", pk)); + LOG.info(String.format("Resolving deltaQuery column '%s' to match entity's declared pk '%s'", resolvedPk, pk)); + return resolvedPk; + } + /** *

Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last * indexed time.

Note: In our definition, unique key of Solr document is the primary key of the top level @@ -852,13 +878,20 @@ public class DocBuilder { Map> deltaSet = new HashMap>(); LOG.info("Running ModifiedRowKey() for Entity: " + entity.name); //get the modified rows in this entity + String pk = entity.getPk(); while (true) { Map row = entityProcessor.nextModifiedRowKey(); if (row == null) break; - deltaSet.put(row.get(entity.getPk()).toString(), row); + Object pkValue = row.get(pk); + if (pkValue == null) { + pk = findMatchingPkColumn(pk, row); + pkValue = row.get(pk); + } + + deltaSet.put(pkValue.toString(), row); importStatistics.rowsCount.incrementAndGet(); // check for abort if (stop.get()) @@ -873,8 +906,14 @@ public class DocBuilder { deletedSet.add(row); + Object pkValue = row.get(pk); + if (pkValue == null) { + pk = findMatchingPkColumn(pk, row); + pkValue = row.get(pk); + } + // Remove deleted rows from the delta rows - String deletedRowPk = row.get(entity.getPk()).toString(); + String deletedRowPk = pkValue.toString(); if (deltaSet.containsKey(deletedRowPk)) { deltaSet.remove(deletedRowPk); } diff --git a/solr/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java b/solr/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java new file mode 100644 index 00000000000..51fc50b9f1a --- /dev/null +++ b/solr/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.dataimport; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.*; + +/** + *

+ * Test for SqlEntityProcessorDelta verifying fix for SOLR-1191 + *

+ * + * + * @version $Id$ + * @since solr 3.1 + */ +public class TestSqlEntityProcessorDeltaPrefixedPk extends AbstractDataImportHandlerTestCase { + private static final String FULLIMPORT_QUERY = "select * from x"; + + private static final String DELTA_QUERY = "select id from x where last_modified > NOW"; + + private static final String DELETED_PK_QUERY = "select id from x where last_modified > NOW AND deleted='true'"; + + private static final String dataConfig_delta = + "" + + " \n" + + " \n" + + " \n" + + " \n" + + " \n" + + " \n" + + " \n" + + "\n"; + + private static final List EMPTY_LIST = Collections.EMPTY_LIST; + + @BeforeClass + public static void beforeClass() throws Exception { + initCore("dataimport-solrconfig.xml", "dataimport-schema.xml"); + } + + @Before @Override + public void setUp() throws Exception { + super.setUp(); + clearIndex(); + assertU(commit()); + //Logger.getLogger("").setLevel(Level.ALL); + } + + @SuppressWarnings("unchecked") + private void add1document() throws Exception { + List row = new ArrayList(); + row.add(createMap("id", "1", "desc", "bar")); + MockDataSource.setIterator(FULLIMPORT_QUERY, row.iterator()); + + runFullImport(dataConfig_delta); + + assertQ(req("*:* OR add1document"), "//*[@numFound='1']"); + assertQ(req("id:1"), "//*[@numFound='1']"); + assertQ(req("desc:bar"), "//*[@numFound='1']"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDeltaImport_deleteResolvesUnprefixedPk() throws Exception { + add1document(); + MockDataSource.clearCache(); + List deletedRows = new ArrayList(); + deletedRows.add(createMap("id", "1")); + MockDataSource.setIterator(DELETED_PK_QUERY, deletedRows.iterator()); + MockDataSource.setIterator(DELTA_QUERY, EMPTY_LIST.iterator()); + runDeltaImport(dataConfig_delta); + + assertQ(req("*:* OR testDeltaImport_deleteResolvesUnprefixedPk"), "//*[@numFound='0']"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDeltaImport_replace_resolvesUnprefixedPk() throws Exception { + add1document(); + MockDataSource.clearCache(); + List deltaRows = new ArrayList(); + deltaRows.add(createMap("id", "1")); + MockDataSource.setIterator(DELTA_QUERY, deltaRows.iterator()); + MockDataSource.setIterator(DELETED_PK_QUERY, EMPTY_LIST.iterator()); + List rows = new ArrayList(); + rows.add(createMap("id", "1", "desc", "baz")); + MockDataSource.setIterator("select * from x where id='1'", rows.iterator()); + + runDeltaImport(dataConfig_delta); + + assertQ(req("*:* OR testDeltaImport_replace_resolvesUnprefixedPk"), "//*[@numFound='1']"); + assertQ(req("id:1"), "//*[@numFound='1']"); + assertQ(req("desc:bar"), "//*[@numFound='0']"); + assertQ(req("desc:baz"), "//*[@numFound='1']"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDeltaImport_addResolvesUnprefixedPk() throws Exception { + add1document(); + MockDataSource.clearCache(); + + List deltaRows = new ArrayList(); + deltaRows.add(createMap("id", "2")); + MockDataSource.setIterator(DELTA_QUERY, deltaRows.iterator()); + + List rows = new ArrayList(); + rows.add(createMap("id", "2", "desc", "xyzzy")); + MockDataSource.setIterator("select * from x where id='2'", rows.iterator()); + + runDeltaImport(dataConfig_delta); + + assertQ(req("*:* OR testDeltaImport_addResolvesUnprefixedPk"), "//*[@numFound='2']"); + assertQ(req("id:1"), "//*[@numFound='1']"); + assertQ(req("id:2"), "//*[@numFound='1']"); + assertQ(req("desc:bar"), "//*[@numFound='1']"); + assertQ(req("desc:xyzzy"), "//*[@numFound='1']"); + } + +}