From d483108a1508e9a5f6324a5fe5547deb4c6a713f Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Fri, 30 Mar 2018 19:54:18 +0700 Subject: [PATCH 1/8] SOLR-12168: LIROnShardRestartTest failures --- .../solr/cloud/LIROnShardRestartTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java index c83739efde3..31947be67be 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud; +import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -45,12 +46,16 @@ import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @LuceneTestCase.Nightly @LuceneTestCase.Slow @Deprecated public class LIROnShardRestartTest extends SolrCloudTestCase { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @BeforeClass public static void setupCluster() throws Exception { System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); @@ -132,6 +137,9 @@ public class LIROnShardRestartTest extends SolrCloudTestCase { // now expire each node for (Replica replica : docCollection.getReplicas()) { try { + // todo remove the condition for skipping leader after SOLR-12166 is fixed + if (newLeader.getName().equals(replica.getName())) continue; + cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(), znodeData, true); } catch (KeeperException.NodeExistsException e) { @@ -153,7 +161,14 @@ public class LIROnShardRestartTest extends SolrCloudTestCase { if (electionNodes.isEmpty()) break; } assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut()); - waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + try { + waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + } catch (Throwable th) { + String electionPath = "/collections/allReplicasInLIR/leader_elect/shard1/election/"; + List children = zkClient().getChildren(electionPath, null, true); + LOG.info("Election queue {}", children); + throw th; + } assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound()); From 35bfe897901f1b51bce654b49aecd9560bfa797f Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Fri, 30 Mar 2018 20:11:39 +0700 Subject: [PATCH 2/8] SOLR-12066: Cleanup deleted core when node start --- solr/CHANGES.txt | 2 ++ .../org/apache/solr/cloud/ZkController.java | 22 ++++++++++--- .../org/apache/solr/core/CoreContainer.java | 7 +++- .../apache/solr/cloud/DeleteReplicaTest.java | 33 +++++++++++++++++++ 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5854e0ff7fb..12bc25a6165 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -110,6 +110,8 @@ Optimizations * SOLR-12146: LIR should skip deleted replicas (Cao Manh Dat) +* SOLR-12066: Cleanup deleted core when node start (Cao Manh Dat) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index c0ddd260adf..872a8b9d7e1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1661,6 +1661,9 @@ public class ZkController { Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (NotInClusterStateException e) { + // make the stack trace less verbose + throw e; } catch (Exception e) { log.error("", e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e); @@ -1688,7 +1691,7 @@ public class ZkController { return true; } - private void checkStateInZk(CoreDescriptor cd) throws InterruptedException { + private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotInClusterStateException { if (!Overseer.isLegacy(zkStateReader)) { CloudDescriptor cloudDesc = cd.getCloudDescriptor(); String nodeName = cloudDesc.getCoreNodeName(); @@ -1722,7 +1725,8 @@ public class ZkController { } Replica replica = slice.getReplica(coreNodeName); if (replica == null) { - errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId()); + errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() + + ", ignore the exception if the replica was deleted"); return false; } return true; @@ -1730,8 +1734,9 @@ public class ZkController { } catch (TimeoutException e) { String error = errorMessage.get(); if (error == null) - error = "Replica " + coreNodeName + " is not present in cluster state"; - throw new SolrException(ErrorCode.SERVER_ERROR, error + ": " + collectionState.get()); + error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() + + ", ignore the exception if the replica was deleted"; + throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error); } } } @@ -2711,6 +2716,15 @@ public class ZkController { } } + /** + * Thrown during pre register process if the replica is not present in clusterstate + */ + public static class NotInClusterStateException extends SolrException { + public NotInClusterStateException(ErrorCode code, String msg) { + super(code, msg); + } + } + public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName()); if (collection != null) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index b667bc06209..74b718cdca5 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -677,7 +677,7 @@ public class CoreContainer { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { - log.error("Error waiting for SolrCore to be created", e); + log.error("Error waiting for SolrCore to be loaded on startup", e.getCause()); } } } finally { @@ -1063,6 +1063,11 @@ public class CoreContainer { return core; } catch (Exception e) { coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); + if (e instanceof ZkController.NotInClusterStateException && !newCollection) { + // this mostly happen when the core is deleted when this node is down + unload(dcore.getName(), true, true, true); + throw e; + } solrCores.removeCoreDescriptor(dcore); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); if(core != null && !core.isClosed()) diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java index 3208ebd5dc6..1a021d7d8f3 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java @@ -41,7 +41,10 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; import org.apache.solr.core.ZkContainer; +import org.apache.solr.util.FileUtils; import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; import org.junit.BeforeClass; @@ -152,6 +155,36 @@ public class DeleteReplicaTest extends SolrCloudTestCase { } + @Test + public void deleteReplicaOnDownNode() throws Exception { + final String collectionName = "deleteReplicaOnDownNode"; + CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).process(cluster.getSolrClient()); + waitForState("Expected one shards with two replicas", collectionName, clusterShape(1, 2)); + + Slice shard = getCollectionState(collectionName).getSlice("shard1"); + Replica replica = shard.getReplicas(rep -> !rep.getName().equals(shard.getLeader().getName())).get(0); + JettySolrRunner replicaJetty = getJettyForReplica(replica); + CoreDescriptor replicaCd; + try (SolrCore core = replicaJetty.getCoreContainer().getCore(replica.getCoreName())) { + replicaCd = core.getCoreDescriptor(); + } + assertNotNull("Expected core descriptor of "+ replica.getName() + " is not null",replicaCd); + String replicaJettyNodeName = replicaJetty.getNodeName(); + + // shutdown node of a replica + replicaJetty.stop(); + waitForNodeLeave(replicaJettyNodeName); + waitForState("Expected one shards with one replica", collectionName, clusterShape(1, 1)); + CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName()).process(cluster.getSolrClient()); + waitForState("Expected only one replica left", collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 1); + + // restart the test and make sure the data get deleted + replicaJetty.start(); + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeOut.waitFor("Expected data dir and instance dir of " + replica.getName() + " is deleted", () + -> !Files.exists(replicaCd.getInstanceDir()) && !FileUtils.fileExists(replicaCd.getDataDir())); + } + @Test public void deleteReplicaByCountForAllShards() throws Exception { From e3c67b1980da35df78b95b39049a3a7258ebf335 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Fri, 30 Mar 2018 19:42:13 +0530 Subject: [PATCH 3/8] SOLR-12169: Update jira number in BadApple annotation --- .../apache/solr/cloud/autoscaling/ComputePlanActionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java index 943e8fcdffa..720cc4f433d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java @@ -397,7 +397,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { } @Test - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") + @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12169") public void testSelectedCollections() throws Exception { AssertingTriggerAction.expectedNode = null; From 85decabe46966ec3a73e80294fe33cfa862975fd Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Fri, 30 Mar 2018 22:53:55 +0530 Subject: [PATCH 4/8] SOLR-12133: Fix race conditions that caused TriggerIntegrationTest.testEventQueue to fail --- solr/CHANGES.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 12bc25a6165..6864e0b96cb 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -135,6 +135,8 @@ Other Changes * SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin) +* SOLR-12133: Fix race conditions that caused TriggerIntegrationTest.testEventQueue to fail. (Mark Miller, shalin) + ================== 7.3.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. From bd85fd389f157796901c7b6a7b3fba467e6ad39e Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Fri, 30 Mar 2018 22:56:58 +0530 Subject: [PATCH 5/8] SOLR-12169: Fix ComputePlanActionTest.testSelectedCollections fails on jenkins by aggressively cleaning up trigger state left by other test methods in the test setup --- solr/CHANGES.txt | 3 +++ .../cloud/autoscaling/NodeAddedTrigger.java | 2 +- .../cloud/autoscaling/NodeLostTrigger.java | 2 +- .../autoscaling/ComputePlanActionTest.java | 25 +++++++++++++------ 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6864e0b96cb..e7349cf5588 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -137,6 +137,9 @@ Other Changes * SOLR-12133: Fix race conditions that caused TriggerIntegrationTest.testEventQueue to fail. (Mark Miller, shalin) +* SOLR-12169: Fix ComputePlanActionTest.testSelectedCollections fails on jenkins by aggressively cleaning up + trigger state left by other test methods in the test setup. (shalin) + ================== 7.3.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java index ad89f2a6798..6190a499175 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -53,7 +53,7 @@ public class NodeAddedTrigger extends TriggerBase { SolrCloudManager cloudManager) { super(TriggerEventType.NODEADDED, name, properties, loader, cloudManager); lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes()); - log.debug("Initial livenodes: {}", lastLiveNodes); + log.debug("NodeAddedTrigger {} - Initial livenodes: {}", name, lastLiveNodes); log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java index 1e7aec590a5..2981a485b67 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java @@ -52,7 +52,7 @@ public class NodeLostTrigger extends TriggerBase { SolrCloudManager dataProvider) { super(TriggerEventType.NODELOST, name, properties, loader, dataProvider); lastLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes()); - log.debug("Initial livenodes: {}", lastLiveNodes); + log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes); } @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java index 720cc4f433d..67b5fa02c93 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java @@ -71,6 +71,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { private static CountDownLatch triggerFiredLatch = new CountDownLatch(1); private static final AtomicReference actionContextPropsRef = new AtomicReference<>(); private static final AtomicReference eventRef = new AtomicReference<>(); + private static SolrCloudManager cloudManager; @BeforeClass public static void setupCluster() throws Exception { @@ -83,10 +84,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase { public void setUp() throws Exception { super.setUp(); - fired.set(false); - triggerFiredLatch = new CountDownLatch(1); - actionContextPropsRef.set(null); - // remove everything from autoscaling.json in ZK zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(UTF_8), true); @@ -129,6 +126,20 @@ public class ComputePlanActionTest extends SolrCloudTestCase { req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPreferencesCommand); response = solrClient.request(req); assertEquals(response.get("result").toString(), "success"); + + cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + + fired.set(false); + triggerFiredLatch = new CountDownLatch(1); + actionContextPropsRef.set(null); + } + + private void deleteChildrenRecursively(String path) throws Exception { + cloudManager.getDistribStateManager().removeRecursively(path, true, false); } @After @@ -365,7 +376,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { } public static class AssertingTriggerAction implements TriggerAction { - static String expectedNode; + static volatile String expectedNode; @Override public String getName() { @@ -397,8 +408,8 @@ public class ComputePlanActionTest extends SolrCloudTestCase { } @Test - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12169") public void testSelectedCollections() throws Exception { + log.info("Found number of jetties: {}", cluster.getJettySolrRunners().size()); AssertingTriggerAction.expectedNode = null; // start 3 more nodes @@ -467,7 +478,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { Map context = actionContextPropsRef.get(); assertNotNull(context); List operations = (List) context.get("operations"); - assertNotNull("The operations computed by ComputePlanAction should not be null" + getNodeStateProviderState() + context, operations); + assertNotNull("The operations computed by ComputePlanAction should not be null. " + getNodeStateProviderState() + context, operations); assertEquals("ComputePlanAction should have computed exactly 2 operations", 2, operations.size()); SolrRequest request = operations.get(0); SolrParams params = request.getParams(); From ab092942cf621b39afaae0d8b370deb3e084388a Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Sat, 31 Mar 2018 14:50:28 +0700 Subject: [PATCH 6/8] SOLR-12066: Move test to DeleteInactiveReplicaTest --- .../solr/cloud/DeleteInactiveReplicaTest.java | 22 ++++++++----- .../apache/solr/cloud/DeleteReplicaTest.java | 33 ------------------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java index 0f4ff48d61d..33a1a55955d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java @@ -17,6 +17,8 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.util.concurrent.TimeUnit; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.embedded.JettySolrRunner; @@ -26,7 +28,11 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.core.CoreContainer; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.util.FileUtils; +import org.apache.solr.util.TimeOut; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -64,6 +70,10 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase { Slice shard = getRandomShard(collectionState); Replica replica = getRandomReplica(shard); JettySolrRunner jetty = cluster.getReplicaJetty(replica); + CoreDescriptor replicaCd; + try (SolrCore core = jetty.getCoreContainer().getCore(replica.getCoreName())) { + replicaCd = core.getCoreDescriptor(); + } cluster.stopJettySolrRunner(jetty); waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c) -> { @@ -80,13 +90,9 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase { cluster.startJettySolrRunner(jetty); log.info("restarted jetty"); - - CoreContainer cc = jetty.getCoreContainer(); - CoreContainer.CoreLoadFailure loadFailure = cc.getCoreInitFailures().get(replica.getCoreName()); - assertNotNull("Deleted core was still loaded!", loadFailure); - assertNotNull(loadFailure.exception.getCause()); - assertTrue("Unexpected load failure message: " + loadFailure.exception.getCause().getMessage(), - loadFailure.exception.getCause().getMessage().contains("does not exist in shard")); + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeOut.waitFor("Expected data dir and instance dir of " + replica.getName() + " is deleted", () + -> !Files.exists(replicaCd.getInstanceDir()) && !FileUtils.fileExists(replicaCd.getDataDir())); // Check that we can't create a core with no coreNodeName try (SolrClient queryClient = getHttpSolrClient(jetty.getBaseUrl().toString())) { diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java index 1a021d7d8f3..3208ebd5dc6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java @@ -41,10 +41,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; -import org.apache.solr.core.CoreDescriptor; -import org.apache.solr.core.SolrCore; import org.apache.solr.core.ZkContainer; -import org.apache.solr.util.FileUtils; import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; import org.junit.BeforeClass; @@ -155,36 +152,6 @@ public class DeleteReplicaTest extends SolrCloudTestCase { } - @Test - public void deleteReplicaOnDownNode() throws Exception { - final String collectionName = "deleteReplicaOnDownNode"; - CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).process(cluster.getSolrClient()); - waitForState("Expected one shards with two replicas", collectionName, clusterShape(1, 2)); - - Slice shard = getCollectionState(collectionName).getSlice("shard1"); - Replica replica = shard.getReplicas(rep -> !rep.getName().equals(shard.getLeader().getName())).get(0); - JettySolrRunner replicaJetty = getJettyForReplica(replica); - CoreDescriptor replicaCd; - try (SolrCore core = replicaJetty.getCoreContainer().getCore(replica.getCoreName())) { - replicaCd = core.getCoreDescriptor(); - } - assertNotNull("Expected core descriptor of "+ replica.getName() + " is not null",replicaCd); - String replicaJettyNodeName = replicaJetty.getNodeName(); - - // shutdown node of a replica - replicaJetty.stop(); - waitForNodeLeave(replicaJettyNodeName); - waitForState("Expected one shards with one replica", collectionName, clusterShape(1, 1)); - CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName()).process(cluster.getSolrClient()); - waitForState("Expected only one replica left", collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 1); - - // restart the test and make sure the data get deleted - replicaJetty.start(); - TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); - timeOut.waitFor("Expected data dir and instance dir of " + replica.getName() + " is deleted", () - -> !Files.exists(replicaCd.getInstanceDir()) && !FileUtils.fileExists(replicaCd.getDataDir())); - } - @Test public void deleteReplicaByCountForAllShards() throws Exception { From acb3c379427193036f3d56503529400736ac5dff Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 29 Mar 2018 17:21:59 +0200 Subject: [PATCH 7/8] LUCENE-8232: Separate out PendingDeletes from ReadersAndUpdates Today ReadersAndUpdates is tightly coupled with IW and all the handling of pending deletes. This change decouples IW and pending deletes from ReadersAndUpdates and makes PendingDeletes unittestable. --- .../lucene/index/BufferedUpdatesStream.java | 4 +- .../org/apache/lucene/index/IndexWriter.java | 32 +-- .../apache/lucene/index/PendingDeletes.java | 193 ++++++++++++++++++ .../lucene/index/ReadersAndUpdates.java | 185 +++++------------ .../src/java/org/apache/lucene/util/Bits.java | 2 +- .../lucene/index/TestPendingDeletes.java | 142 +++++++++++++ 6 files changed, 404 insertions(+), 154 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java create mode 100644 lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index e887e4dbcce..63001d4d0f5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -333,8 +333,8 @@ class BufferedUpdatesStream implements Accountable { if (success) { totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount; int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount(); - assert fullDelCount <= segState.rld.info.info.maxDoc(); - if (fullDelCount == segState.rld.info.info.maxDoc()) { + assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc(); + if (segState.rld.isFullyDeleted()) { if (allDeleted == null) { allDeleted = new ArrayList<>(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 40a53e0f324..3791e190625 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -604,7 +604,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) { // This is the last ref to this RLD, and we're not // pooling, so remove it: - if (rld.writeLiveDocs(directory)) { + boolean changed = rld.writeLiveDocs(directory); + changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + + if (changed) { // Make sure we only write del docs for a live segment: assert assertInfoLive == false || assertInfoIsLive(rld.info); // Must checkpoint because we just @@ -616,9 +619,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // did was move the state to disk: checkpointNoSIS(); } - - rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); - if (rld.getNumDVUpdates() == 0) { rld.dropReaders(); readerMap.remove(rld.info); @@ -635,13 +635,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } void writeAllDocValuesUpdates() throws IOException { + assert Thread.holdsLock(IndexWriter.this); Collection copy; synchronized (this) { + // this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException copy = new HashSet<>(readerMap.values()); } boolean any = false; for (ReadersAndUpdates rld : copy) { - any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); } if (any) { checkpoint(); @@ -649,11 +651,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } void writeDocValuesUpdatesForMerge(List infos) throws IOException { + assert Thread.holdsLock(IndexWriter.this); boolean any = false; for (SegmentCommitInfo info : infos) { ReadersAndUpdates rld = get(info, false); if (rld != null) { - any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); rld.setIsMerging(); } } @@ -706,7 +709,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Only acquire IW lock on each write, since this is a time consuming operation. This way // other threads get a chance to run in between our writes. synchronized (IndexWriter.this) { - rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { + checkpointNoSIS(); + } } long bytesUsedAfter = rld.ramBytesUsed.get(); ramBytesUsed -= bytesUsedBefore - bytesUsedAfter; @@ -789,8 +794,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (rld != null) { assert rld.info == info; boolean changed = rld.writeLiveDocs(directory); - - changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); if (changed) { // Make sure we only write del docs for a live segment: @@ -838,7 +842,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (create == false) { return null; } - rld = new ReadersAndUpdates(IndexWriter.this, info); + rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, null, new PendingDeletes(null, info)); // Steal initial reference: readerMap.put(info, rld); } else { @@ -1147,7 +1151,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { LeafReaderContext leaf = leaves.get(i); SegmentReader segReader = (SegmentReader) leaf.reader(); SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs()); - readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader)); + readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), newReader, new PendingDeletes(newReader, newReader.getSegmentInfo()))); } // We always assume we are carrying over incoming changes when opening from reader: @@ -1637,8 +1641,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (rld != null) { synchronized(bufferedUpdatesStream) { if (rld.delete(docID)) { - final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); - if (fullDelCount == rld.info.info.maxDoc()) { + if (rld.isFullyDeleted()) { dropDeletedSegment(rld.info); checkpoint(); } @@ -4000,8 +4003,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { final boolean allDeleted = merge.segments.size() == 0 || merge.info.info.maxDoc() == 0 || - (mergedUpdates != null && - mergedUpdates.getPendingDeleteCount() == merge.info.info.maxDoc()); + (mergedUpdates != null && mergedUpdates.isFullyDeleted()); if (infoStream.isEnabled("IW")) { if (allDeleted) { diff --git a/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java new file mode 100644 index 00000000000..74043f3f44a --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.LiveDocsFormat; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.TrackingDirectoryWrapper; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.MutableBits; + +/** + * This class handles accounting and applying pending deletes for live segment readers + */ +final class PendingDeletes { + private final SegmentCommitInfo info; + // True if the current liveDocs is referenced by an + // external NRT reader: + private boolean liveDocsShared; + // Holds the current shared (readable and writable) + // liveDocs. This is null when there are no deleted + // docs, and it's copy-on-write (cloned whenever we need + // to change it but it's been shared to an external NRT + // reader). + private Bits liveDocs; + private int pendingDeleteCount; + + PendingDeletes(SegmentReader reader, SegmentCommitInfo info) { + this.info = info; + liveDocsShared = true; + liveDocs = reader != null ? reader.getLiveDocs() : null; + if (reader != null) { + pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount(); + } else { + pendingDeleteCount = 0; + } + } + + + /** + * Marks a document as deleted in this segment and return true if a document got actually deleted or + * if the document was already deleted. + */ + boolean delete(int docID) throws IOException { + assert info.info.maxDoc() > 0; + if (liveDocsShared) { + // Copy on write: this means we've cloned a + // SegmentReader sharing the current liveDocs + // instance; must now make a private clone so we can + // change it: + LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat(); + if (liveDocs == null) { + liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc()); + } else { + liveDocs = liveDocsFormat.newLiveDocs(liveDocs); + } + liveDocsShared = false; + } + + assert liveDocs != null; + assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc(); + assert !liveDocsShared; + final boolean didDelete = liveDocs.get(docID); + if (didDelete) { + ((MutableBits) liveDocs).clear(docID); + pendingDeleteCount++; + } + return didDelete; + } + + /** + * Should be called if the live docs returned from {@link #getLiveDocs()} are shared outside of the + * {@link ReadersAndUpdates} + */ + void liveDocsShared() { + liveDocsShared = true; + } + + /** + * Returns the current live docs or null if all docs are live. The returned instance might be mutable or is mutated behind the scenes. + * If the returned live docs are shared outside of the ReadersAndUpdates {@link #liveDocsShared()} should be called + * first. + */ + Bits getLiveDocs() { + return liveDocs; + } + + /** + * Returns the number of pending deletes that are not written to disk. + */ + int numPendingDeletes() { + return pendingDeleteCount; + } + + /** + * Called once a new reader is opened for this segment ie. when deletes or updates are applied. + */ + void onNewReader(SegmentReader reader, SegmentCommitInfo info) { + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + } + + /** + * Resets the pending docs + */ + void reset() { + pendingDeleteCount = 0; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("PendingDeletes(seg=").append(info); + sb.append(" numPendingDeletes=").append(pendingDeleteCount); + sb.append(" liveDocsShared=").append(liveDocsShared); + return sb.toString(); + } + + /** + * Writes the live docs to disk and returns true if any new docs were written. + */ + boolean writeLiveDocs(Directory dir) throws IOException { + if (pendingDeleteCount == 0) { + return false; + } + + Bits liveDocs = this.liveDocs; + assert liveDocs != null; + // We have new deletes + assert liveDocs.length() == info.info.maxDoc(); + + // Do this so we can delete any created files on + // exception; this saves all codecs from having to do + // it: + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + + // We can write directly to the actual name (vs to a + // .tmp & renaming it) because the file is not live + // until segments file is written: + boolean success = false; + try { + Codec codec = info.info.getCodec(); + codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + success = true; + } finally { + if (!success) { + // Advance only the nextWriteDelGen so that a 2nd + // attempt to write will write to a new file + info.advanceNextWriteDelGen(); + + // Delete any partially created file(s): + for (String fileName : trackingDir.getCreatedFiles()) { + IOUtils.deleteFilesIgnoringExceptions(dir, fileName); + } + } + } + + // If we hit an exc in the line above (eg disk full) + // then info's delGen remains pointing to the previous + // (successfully written) del docs: + info.advanceDelGen(); + info.setDelCount(info.getDelCount() + pendingDeleteCount); + reset(); + return true; + } + + /** + * Returns true iff the segment represented by this {@link PendingDeletes} is fully deleted + */ + boolean isFullyDeleted() { + return info.getDelCount() + pendingDeleteCount == info.info.maxDoc(); + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index 16ea1e5b9d9..8a0e17e2b71 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -20,6 +20,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -34,7 +35,6 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.FieldInfosFormat; -import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; @@ -43,38 +43,27 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.MutableBits; // Used by IndexWriter to hold open SegmentReaders (for // searching or merging), plus pending deletes and updates, // for a given segment -class ReadersAndUpdates { +final class ReadersAndUpdates { // Not final because we replace (clone) when we need to // change it and it's been shared: - public final SegmentCommitInfo info; + final SegmentCommitInfo info; // Tracks how many consumers are using this instance: private final AtomicInteger refCount = new AtomicInteger(1); - private final IndexWriter writer; - // Set once (null, and then maybe set, and never set again): private SegmentReader reader; - // Holds the current shared (readable and writable) - // liveDocs. This is null when there are no deleted - // docs, and it's copy-on-write (cloned whenever we need - // to change it but it's been shared to an external NRT - // reader). - private Bits liveDocs; - // How many further deletions we've done against // liveDocs vs when we loaded it or last wrote it: - private int pendingDeleteCount; + private final PendingDeletes pendingDeletes; - // True if the current liveDocs is referenced by an - // external NRT reader: - private boolean liveDocsShared; + // the major version this index was created with + private final int indexCreatedVersionMajor; // Indicates whether this segment is currently being merged. While a segment // is merging, all field updates are also registered in the @@ -96,25 +85,23 @@ class ReadersAndUpdates { // Only set if there are doc values updates against this segment, and the index is sorted: Sorter.DocMap sortMap; - public final AtomicLong ramBytesUsed = new AtomicLong(); - - public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) { - this.writer = writer; + final AtomicLong ramBytesUsed = new AtomicLong(); + + ReadersAndUpdates(int indexCreatedVersionMajor, SegmentCommitInfo info, SegmentReader reader, + PendingDeletes pendingDeletes) { this.info = info; - liveDocsShared = true; + this.pendingDeletes = pendingDeletes; + this.indexCreatedVersionMajor = indexCreatedVersionMajor; + this.reader = reader; } /** Init from a previously opened SegmentReader. * *

NOTE: steals incoming ref from reader. */ - public ReadersAndUpdates(IndexWriter writer, SegmentReader reader) { - this.writer = writer; - this.reader = reader; - info = reader.getSegmentInfo(); - liveDocs = reader.getLiveDocs(); - liveDocsShared = true; - pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount(); - assert pendingDeleteCount >= 0: "got " + pendingDeleteCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs(); + ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) { + this(indexCreatedVersionMajor, reader.getSegmentInfo(), reader, pendingDeletes); + assert pendingDeletes.numPendingDeletes() >= 0 + : "got " + pendingDeletes.numPendingDeletes() + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs(); } public void incRef() { @@ -134,7 +121,7 @@ class ReadersAndUpdates { } public synchronized int getPendingDeleteCount() { - return pendingDeleteCount; + return pendingDeletes.numPendingDeletes(); } private synchronized boolean assertNoDupGen(List fieldUpdates, DocValuesFieldUpdates update) { @@ -186,6 +173,7 @@ class ReadersAndUpdates { // Call only from assert! public synchronized boolean verifyDocCounts() { int count; + Bits liveDocs = pendingDeletes.getLiveDocs(); if (liveDocs != null) { count = 0; for(int docID=0;docID writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat, - FieldInfosFormat infosFormat) throws IOException { + private synchronized Set writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, + FieldInfosFormat infosFormat) throws IOException { final long nextFieldInfosGen = info.getNextFieldInfosGen(); final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX); // we write approximately that many bytes (based on Lucene46DVF): @@ -639,22 +558,15 @@ class ReadersAndUpdates { return trackingDir.getCreatedFiles(); } - public synchronized boolean writeFieldUpdates(Directory dir, long maxDelGen, InfoStream infoStream) throws IOException { - + public synchronized boolean writeFieldUpdates(Directory dir, FieldInfos.FieldNumbers fieldNumbers, long maxDelGen, InfoStream infoStream) throws IOException { long startTimeNS = System.nanoTime(); - - assert Thread.holdsLock(writer); - final Map> newDVFiles = new HashMap<>(); Set fieldInfosFiles = null; FieldInfos fieldInfos = null; - boolean any = false; - int count = 0; for (List updates : pendingDVUpdates.values()) { // Sort by increasing delGen: - Collections.sort(updates, (a, b) -> Long.compare(a.delGen, b.delGen)); - count += updates.size(); + Collections.sort(updates, Comparator.comparingLong(a -> a.delGen)); for (DocValuesFieldUpdates update : updates) { if (update.delGen <= maxDelGen && update.any()) { any = true; @@ -680,7 +592,7 @@ class ReadersAndUpdates { // IndexWriter.commitMergedDeletes). final SegmentReader reader; if (this.reader == null) { - reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE); + reader = new SegmentReader(info, indexCreatedVersionMajor, IOContext.READONCE); } else { reader = this.reader; } @@ -688,7 +600,7 @@ class ReadersAndUpdates { try { // clone FieldInfos so that we can update their dvGen separately from // the reader's infos and write them to a new fieldInfos_gen file - FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); + FieldInfos.Builder builder = new FieldInfos.Builder(fieldNumbers); // cannot use builder.add(reader.getFieldInfos()) because it does not // clone FI.attributes as well FI.dvGen for (FieldInfo fi : reader.getFieldInfos()) { @@ -713,7 +625,7 @@ class ReadersAndUpdates { handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); - fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, docValuesFormat, codec.fieldInfosFormat()); + fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, codec.fieldInfosFormat()); } finally { if (reader != this.reader) { reader.close(); @@ -763,11 +675,12 @@ class ReadersAndUpdates { // if there is a reader open, reopen it to reflect the updates if (reader != null) { - SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount); + SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); boolean success2 = false; try { reader.decRef(); reader = newReader; + pendingDeletes.onNewReader(reader, info); success2 = true; } finally { if (success2 == false) { @@ -792,14 +705,10 @@ class ReadersAndUpdates { } info.setDocValuesUpdatesFiles(newDVFiles); - // wrote new files, should checkpoint() - writer.checkpointNoSIS(); - if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s", info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles)); } - return true; } @@ -829,12 +738,11 @@ class ReadersAndUpdates { } SegmentReader reader = getReader(context); - int delCount = pendingDeleteCount + info.getDelCount(); + int delCount = pendingDeletes.numPendingDeletes() + info.getDelCount(); if (delCount != reader.numDeletedDocs()) { - // beware of zombies: assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs(); - + Bits liveDocs = pendingDeletes.getLiveDocs(); assert liveDocs != null; // Create a new reader with the latest live docs: @@ -842,6 +750,7 @@ class ReadersAndUpdates { boolean success = false; try { reader.decRef(); + pendingDeletes.onNewReader(newReader, info); success = true; } finally { if (success == false) { @@ -851,7 +760,7 @@ class ReadersAndUpdates { reader = newReader; } - liveDocsShared = true; + pendingDeletes.liveDocsShared(); assert verifyDocCounts(); @@ -877,8 +786,12 @@ class ReadersAndUpdates { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("ReadersAndLiveDocs(seg=").append(info); - sb.append(" pendingDeleteCount=").append(pendingDeleteCount); - sb.append(" liveDocsShared=").append(liveDocsShared); + sb.append(" pendingDeletes=").append(pendingDeletes); return sb.toString(); } + + public synchronized boolean isFullyDeleted() { + return pendingDeletes.isFullyDeleted(); + } + } diff --git a/lucene/core/src/java/org/apache/lucene/util/Bits.java b/lucene/core/src/java/org/apache/lucene/util/Bits.java index 101122e628f..29935e737b8 100644 --- a/lucene/core/src/java/org/apache/lucene/util/Bits.java +++ b/lucene/core/src/java/org/apache/lucene/util/Bits.java @@ -22,7 +22,7 @@ package org.apache.lucene.util; * @lucene.experimental */ -public interface Bits { +public interface Bits { /** * Returns the value of the bit with the specified index. * @param index index, should be non-negative and < {@link #length()}. diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java b/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java new file mode 100644 index 00000000000..39f5680a74f --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.TestUtil; +import org.apache.lucene.util.Version; + +public class TestPendingDeletes extends LuceneTestCase { + + public void testDeleteDoc() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 10, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + assertNull(deletes.getLiveDocs()); + int docToDelete = TestUtil.nextInt(random(), 0, 7); + assertTrue(deletes.delete(docToDelete)); + assertNotNull(deletes.getLiveDocs()); + assertEquals(1, deletes.numPendingDeletes()); + + Bits liveDocs = deletes.getLiveDocs(); + assertFalse(liveDocs.get(docToDelete)); + assertFalse(deletes.delete(docToDelete)); // delete again + + // make sure we are live ie. mutable + assertTrue(liveDocs.get(8)); + assertTrue(deletes.delete(8)); + assertFalse(liveDocs.get(8)); + assertEquals(2, deletes.numPendingDeletes()); + + deletes.liveDocsShared(); + + // make sure we are live ie. mutable + assertTrue(liveDocs.get(9)); + assertTrue(deletes.delete(9)); + assertTrue(liveDocs.get(9)); + liveDocs = deletes.getLiveDocs(); + assertFalse(liveDocs.get(9)); + assertFalse(liveDocs.get(8)); + assertFalse(liveDocs.get(docToDelete)); + assertEquals(3, deletes.numPendingDeletes()); + dir.close(); + } + + public void testWriteLiveDocs() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 6, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + assertFalse(deletes.writeLiveDocs(dir)); + assertEquals(0, dir.listAll().length); + boolean secondDocDeletes = random().nextBoolean(); + deletes.delete(5); + if (secondDocDeletes) { + deletes.liveDocsShared(); + deletes.delete(2); + } + assertEquals(0, commitInfo.getDelGen()); + assertEquals(0, commitInfo.getDelCount()); + + assertEquals(secondDocDeletes ? 2 : 1, deletes.numPendingDeletes()); + assertTrue(deletes.writeLiveDocs(dir)); + assertEquals(1, dir.listAll().length); + Bits liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT); + assertFalse(liveDocs.get(5)); + if (secondDocDeletes) { + assertFalse(liveDocs.get(2)); + } else { + assertTrue(liveDocs.get(2)); + } + assertTrue(liveDocs.get(0)); + assertTrue(liveDocs.get(1)); + assertTrue(liveDocs.get(3)); + assertTrue(liveDocs.get(4)); + + assertEquals(0, deletes.numPendingDeletes()); + assertEquals(secondDocDeletes ? 2 : 1, commitInfo.getDelCount()); + assertEquals(1, commitInfo.getDelGen()); + + deletes.delete(0); + assertTrue(deletes.writeLiveDocs(dir)); + assertEquals(2, dir.listAll().length); + liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT); + assertFalse(liveDocs.get(5)); + if (secondDocDeletes) { + assertFalse(liveDocs.get(2)); + } else { + assertTrue(liveDocs.get(2)); + } + assertFalse(liveDocs.get(0)); + assertTrue(liveDocs.get(1)); + assertTrue(liveDocs.get(3)); + assertTrue(liveDocs.get(4)); + + assertEquals(0, deletes.numPendingDeletes()); + assertEquals(secondDocDeletes ? 3 : 2, commitInfo.getDelCount()); + assertEquals(2, commitInfo.getDelGen()); + dir.close(); + } + + public void testIsFullyDeleted() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 3, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + for (int i = 0; i < 3; i++) { + assertTrue(deletes.delete(i)); + if (random().nextBoolean()) { + assertTrue(deletes.writeLiveDocs(dir)); + } + assertEquals(i == 2, deletes.isFullyDeleted()); + } + } +} From ca02e637ffa117084dc8c59ff32ad487599aae77 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 31 Mar 2018 14:24:36 +0200 Subject: [PATCH 8/8] LUCENE-8232: Write and Checkpoint DV updates seperately if we drop a reader --- .../src/java/org/apache/lucene/index/IndexWriter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 3791e190625..2e141667a20 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -604,10 +604,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) { // This is the last ref to this RLD, and we're not // pooling, so remove it: - boolean changed = rld.writeLiveDocs(directory); - changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); - - if (changed) { + if (rld.writeLiveDocs(directory)) { // Make sure we only write del docs for a live segment: assert assertInfoLive == false || assertInfoIsLive(rld.info); // Must checkpoint because we just @@ -619,6 +616,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // did was move the state to disk: checkpointNoSIS(); } + if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { + checkpointNoSIS(); + } if (rld.getNumDVUpdates() == 0) { rld.dropReaders(); readerMap.remove(rld.info);