mirror of https://github.com/apache/lucene.git
SOLR-893 -- Unable to delete documents via SQL and deletedPkQuery with deltaimport
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@723824 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
78bc41b54e
commit
7fe3fa9468
|
@ -63,6 +63,9 @@ Bug Fixes
|
|||
|
||||
8. SOLR-873: Fix case-sensitive field names and columns (Jon Baer, shalin)
|
||||
|
||||
9. SOLR-893: Unable to delete documents via SQL and deletedPkQuery with deltaimport
|
||||
(Dan Rosher via shalin)
|
||||
|
||||
Documentation
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -463,10 +463,8 @@ public class DocBuilder {
|
|||
Set<Map<String, Object>> deltaSet = new HashSet<Map<String, Object>>();
|
||||
resolver.addNamespace(null, (Map) entity.allAttributes);
|
||||
EntityProcessor entityProcessor = getEntityProcessor(entity, context.getCore());
|
||||
entityProcessor.init(new ContextImpl(entity, resolver, null,
|
||||
Context.FIND_DELTA, session, null, this));
|
||||
entityProcessor.init(new ContextImpl(entity, resolver, null, Context.FIND_DELTA, session, null, this));
|
||||
LOG.info("Running ModifiedRowKey() for Entity: " + entity.name);
|
||||
int count = 0;
|
||||
//get the modified rows in this entity
|
||||
while (true) {
|
||||
Map<String, Object> row = entityProcessor.nextModifiedRowKey();
|
||||
|
@ -475,27 +473,32 @@ public class DocBuilder {
|
|||
break;
|
||||
|
||||
deltaSet.add(row);
|
||||
count++;
|
||||
importStatistics.rowsCount.incrementAndGet();
|
||||
}
|
||||
LOG.info("Completed ModifiedRowKey for Entity: " + entity.name
|
||||
+ " rows obtained : " + count);
|
||||
count = 0;
|
||||
// identifying the deleted rows from this entities
|
||||
LOG.info("Running DeletedRowKey() for Entity: " + entity.name);
|
||||
//get the deleted rows for this entity
|
||||
Set<Map<String, Object>> deletedSet = new HashSet<Map<String, Object>>();
|
||||
Set<Map<String, Object>> deltaRemoveSet = new HashSet<Map<String, Object>>();
|
||||
while (true) {
|
||||
Map<String, Object> row = entityProcessor.nextDeletedRowKey();
|
||||
if (row == null)
|
||||
break;
|
||||
|
||||
//Check to see if this delete is in the current delta set
|
||||
for (Map<String, Object> modifiedRow : deltaSet) {
|
||||
if (modifiedRow.get(entity.pk).equals(row.get(entity.pk))) {
|
||||
deltaRemoveSet.add(modifiedRow);
|
||||
}
|
||||
}
|
||||
|
||||
deletedSet.add(row);
|
||||
count++;
|
||||
importStatistics.rowsCount.incrementAndGet();
|
||||
}
|
||||
LOG.info("Completed DeletedRowKey for Entity: " + entity.name
|
||||
+ " rows obtained : " + count);
|
||||
|
||||
//asymmetric Set difference
|
||||
deltaSet.removeAll(deltaRemoveSet);
|
||||
|
||||
LOG.info("Completed ModifiedRowKey for Entity: " + entity.name + " rows obtained : " + deltaSet.size());
|
||||
LOG.info("Completed DeletedRowKey for Entity: " + entity.name + " rows obtained : " + deletedSet.size());
|
||||
|
||||
myModifiedPks.addAll(deltaSet);
|
||||
Set<Map<String, Object>> parentKeyList = new HashSet<Map<String, Object>>();
|
||||
|
@ -507,12 +510,10 @@ public class DocBuilder {
|
|||
// identifying deleted rows with deltas
|
||||
|
||||
for (Map<String, Object> row : myModifiedPks)
|
||||
getModifiedParentRows(resolver.addNamespace(entity.name, row),
|
||||
entity.name, parentEntityProcessor, parentKeyList);
|
||||
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
|
||||
// running the same for deletedrows
|
||||
for (Map<String, Object> row : deletedSet) {
|
||||
getModifiedParentRows(resolver.addNamespace(entity.name, row),
|
||||
entity.name, parentEntityProcessor, parentKeyList);
|
||||
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
|
||||
}
|
||||
}
|
||||
LOG.info("Completed parentDeltaQuery for Entity: " + entity.name);
|
||||
|
|
|
@ -230,10 +230,12 @@ public class EntityProcessorBase extends EntityProcessor {
|
|||
if (rowIterator.hasNext())
|
||||
return rowIterator.next();
|
||||
query = null;
|
||||
rowIterator = null;
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.error("getNext() failed for query '" + query + "'", e);
|
||||
query = null;
|
||||
rowIterator = null;
|
||||
wrapAndThrow(DataImportHandlerException.WARN, e);
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTest {
|
|||
|
||||
List parentRow = new ArrayList();
|
||||
parentRow.add(createMap("id", "5"));
|
||||
MockDataSource.setIterator("select * from x where x.id = '5'", parentRow
|
||||
MockDataSource.setIterator("select * from x where id = '5'", parentRow
|
||||
.iterator());
|
||||
|
||||
List childRow = new ArrayList();
|
||||
|
@ -94,6 +94,56 @@ public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTest {
|
|||
assertQ(req("desc:hello"), "//*[@numFound='1']");
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCompositePk_DeltaImport_DeletedPkQuery() throws Exception {
|
||||
List parentRow = new ArrayList();
|
||||
parentRow.add(createMap("id", "11"));
|
||||
MockDataSource.setIterator("select * from x", parentRow.iterator());
|
||||
|
||||
List childRow = new ArrayList();
|
||||
childRow.add(createMap("desc", "hello"));
|
||||
|
||||
MockDataSource.setIterator("select * from y where y.A=11", childRow
|
||||
.iterator());
|
||||
|
||||
super.runFullImport(dataConfig);
|
||||
|
||||
assertQ(req("id:11"), "//*[@numFound='1']");
|
||||
|
||||
|
||||
|
||||
List deltaRow = new ArrayList();
|
||||
deltaRow.add(createMap("id", "15"));
|
||||
deltaRow.add(createMap("id", "17"));
|
||||
MockDataSource.setIterator("select id from x where last_modified > NOW",
|
||||
deltaRow.iterator());
|
||||
|
||||
List deltaDeleteRow = new ArrayList();
|
||||
deltaDeleteRow.add(createMap("id", "11"));
|
||||
deltaDeleteRow.add(createMap("id", "17"));
|
||||
MockDataSource.setIterator("select id from x where last_modified > NOW AND deleted='true'",
|
||||
deltaDeleteRow.iterator());
|
||||
|
||||
parentRow = new ArrayList();
|
||||
parentRow.add(createMap("id", "15"));
|
||||
MockDataSource.setIterator("select * from x where id = '15'", parentRow
|
||||
.iterator());
|
||||
|
||||
parentRow = new ArrayList();
|
||||
parentRow.add(createMap("id", "17"));
|
||||
MockDataSource.setIterator("select * from x where id = '17'", parentRow
|
||||
.iterator());
|
||||
|
||||
super.runDeltaImport(dataConfig);
|
||||
|
||||
assertQ(req("id:15"), "//*[@numFound='1']");
|
||||
assertQ(req("id:11"), "//*[@numFound='0']");
|
||||
assertQ(req("id:17"), "//*[@numFound='0']");
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCompositePk_DeltaImport_DeltaImportQuery() throws Exception {
|
||||
|
@ -120,7 +170,7 @@ public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTest {
|
|||
|
||||
private static String dataConfig = "<dataConfig>\n"
|
||||
+ " <document>\n"
|
||||
+ " <entity name=\"x\" pk=\"x.id\" query=\"select * from x\" deltaQuery=\"select id from x where last_modified > NOW\">\n"
|
||||
+ " <entity name=\"x\" pk=\"id\" query=\"select * from x\" deletedPkQuery=\"select id from x where last_modified > NOW AND deleted='true'\" deltaQuery=\"select id from x where last_modified > NOW\">\n"
|
||||
+ " <field column=\"id\" />\n"
|
||||
+ " <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
|
||||
+ " <field column=\"desc\" />\n"
|
||||
|
|
Loading…
Reference in New Issue