From e7640f99a0a7823d470b6c1098b2be3709fb7540 Mon Sep 17 00:00:00 2001 From: Erick Erickson Date: Fri, 17 Jul 2015 16:38:13 +0000 Subject: [PATCH] SOLR-6273: Cross Data Center Replication. All tests are now passing on my machine, let's see if Jenkins flushes anything out git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1691606 13f79535-47bb-0310-9956-ffa450edef68 --- .../solr/handler/CdcrReplicatorScheduler.java | 1 + .../solr/cloud/BaseCdcrDistributedZkTest.java | 97 ++++++++------- .../CdcrReplicationDistributedZkTest.java | 115 ++++++++++++++++-- .../cloud/CdcrReplicationHandlerTest.java | 26 ++-- .../solr/cloud/CdcrRequestHandlerTest.java | 4 +- .../cloud/CdcrVersionReplicationTest.java | 3 +- 6 files changed, 168 insertions(+), 78 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java index 9314aed6331..b8091dbf6d7 100644 --- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java +++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java @@ -82,6 +82,7 @@ class CdcrReplicatorScheduler { @Override public void run() { CdcrReplicatorState state = statesQueue.poll(); + assert state != null; // Should never happen try { new CdcrReplicator(state, batchSize).run(); } finally { diff --git a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java index 2f26d469b09..832202ae827 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java @@ -245,11 +245,43 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { return jetty.client.request(request); } + protected void waitForCdcrStateReplication(String collection) throws Exception { + log.info("Wait for CDCR state to replicate - collection: " + collection); + + int cnt = 30; + while (cnt > 0) { + NamedList status = null; + boolean allEquals = true; + for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas + NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS); + if (status == null) { + status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower()); + continue; + } + allEquals &= status.equals(rsp.get(CdcrParams.CdcrAction.STATUS.toLower())); + } + + if (allEquals) { + break; + } + else { + if (cnt == 0) { + throw new RuntimeException("Timeout waiting for CDCR state to replicate: collection="+collection); + } + cnt--; + Thread.sleep(500); + } + } + + log.info("CDCR state is identical across nodes - collection: " + collection); + } + /** * Assert the state of CDCR on each nodes of the given collection. */ protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState) - throws Exception { + throws Exception { + this.waitForCdcrStateReplication(collection); // ensure that cdcr state is replicated and stable for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS); NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower()); @@ -282,6 +314,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { */ protected void clearSourceCollection() throws Exception { this.deleteCollection(SOURCE_COLLECTION); + this.waitForCollectionToDisappear(SOURCE_COLLECTION); this.createCollection(SOURCE_COLLECTION); this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true); this.updateMappingsFromZk(SOURCE_COLLECTION); @@ -305,6 +338,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { */ protected void clearTargetCollection() throws Exception { this.deleteCollection(TARGET_COLLECTION); + this.waitForCollectionToDisappear(TARGET_COLLECTION); this.createCollection(TARGET_COLLECTION); this.waitForRecoveriesToFinish(TARGET_COLLECTION, true); this.updateMappingsFromZk(TARGET_COLLECTION); @@ -389,7 +423,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { /** * Delete a collection through the Collection API. */ - protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException { + protected CollectionAdminResponse deleteCollection(String collectionName) throws Exception { SolrClient client = createCloudClient(null); CollectionAdminResponse res; @@ -412,6 +446,17 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { return res; } + private void waitForCollectionToDisappear(String collection) throws Exception { + CloudSolrClient client = this.createCloudClient(null); + try { + client.connect(); + ZkStateReader zkStateReader = client.getZkStateReader(); + AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15); + } finally { + client.close(); + } + } + private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception { CloudSolrClient client = this.createCloudClient(null); try { @@ -673,15 +718,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { } protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception { - while (true) { + int cnt = 15; + while (cnt > 0) { log.info("Checking queue size @ {}:{}", collectionName, shardId); long size = this.getQueueSize(collectionName, shardId); - if (size <= 0) { + if (size == 0) { // if we received -1, it means that the log reader is not yet initialised, we should wait return; } log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId); + cnt--; Thread.sleep(1000); // wait a bit for the replication to complete } + throw new RuntimeException("Timeout waiting for CDCR replication to complete @" + collectionName + ":" + shardId); } protected long getQueueSize(String collectionName, String shardId) throws Exception { @@ -691,47 +739,6 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase { return (Long) status.get(CdcrParams.QUEUE_SIZE); } - /** - * Asserts that the number of transaction logs across all the shards - */ - protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception { - CollectionInfo info = collectInfo(collection); - Map> shardToCoresMap = info.getShardToCoresMap(); - - int leaderLogs = 0; - ArrayList replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0)); - - for (String shard : shardToCoresMap.keySet()) { - leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir); - for (int i = 0; i < replicationFactor - 1; i++) { - replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir)); - } - } - - for (Integer replicaLogs : replicasLogs) { - log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs); - - // replica logs must be always equal or superior to leader logs - assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.", - replicaLogs, leaderLogs), leaderLogs <= replicaLogs); - - assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.", - leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs); - - assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.", - replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs); - } - } - - private int numberOfFiles(String dir) { - File file = new File(dir); - if (!file.isDirectory()) { - assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false); - } - log.info("Update log dir {} contains: {}", dir, file.listFiles()); - return file.listFiles().length; - } - protected CollectionInfo collectInfo(String collection) throws Exception { CollectionInfo info = new CollectionInfo(collection); for (String shard : shardToJetty.get(collection).keySet()) { diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java index f9b198fbf74..51c2d22698d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java @@ -21,13 +21,15 @@ import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.util.NamedList; import org.apache.solr.handler.CdcrParams; -import org.junit.Ignore; import org.junit.Test; +import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Map; -@Ignore @Slow public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest { @@ -39,7 +41,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest @Test @ShardsFixed(num = 4) - public void doTest() throws Exception { + public void doTests() throws Exception { this.doTestDeleteCreateSourceCollection(); this.doTestTargetCollectionNotAvailable(); this.doTestReplicationStartStop(); @@ -52,12 +54,13 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest this.doTestBatchBoundaries(); this.doTestResilienceWithDeleteByQueryOnTarget(); } - /** * Checks that the test framework handles properly the creation and deletion of collections and the * restart of servers. */ public void doTestDeleteCreateSourceCollection() throws Exception { + this.clearSourceCollection(); + this.clearTargetCollection(); log.info("Indexing documents"); List docs = new ArrayList<>(); @@ -153,8 +156,10 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest assertEquals(0, getNumDocs(TARGET_COLLECTION)); this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); + this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2); commit(TARGET_COLLECTION); @@ -162,6 +167,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest assertEquals(10, getNumDocs(TARGET_COLLECTION)); this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); docs.clear(); for (; start < 110; start++) { @@ -176,8 +182,10 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // with the latest checkpoints this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); + this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2); commit(TARGET_COLLECTION); @@ -196,6 +204,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // send start action to first shard this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); log.info("Indexing 10 documents"); @@ -260,6 +269,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // send start action to first shard this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); log.info("Indexing 10 documents"); @@ -337,11 +347,14 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // buffering is enabled by default, so disable it this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); for (int i = 0; i < 50; i++) { - index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document + // will perform a commit for every document and will create one tlog file per commit + index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); } // wait a bit for the replication to complete @@ -352,26 +365,23 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // Stop CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); assertEquals(50, getNumDocs(SOURCE_COLLECTION)); assertEquals(50, getNumDocs(TARGET_COLLECTION)); - index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes - // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader - assertUpdateLogs(SOURCE_COLLECTION, 50); + assertNumberOfTlogFiles(SOURCE_COLLECTION, 50); for (int i = 50; i < 100; i++) { index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); } - index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes - // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader // and 11 on the non-leader) // the non-leader must have synchronised its update log with its leader - assertUpdateLogs(SOURCE_COLLECTION, 50); + assertNumberOfTlogFiles(SOURCE_COLLECTION, 50); } /** @@ -383,9 +393,11 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // buffering is enabled by default, so disable it this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); // Start CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); // Index documents for (int i = 0; i < 200; i++) { @@ -425,6 +437,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // Start CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); // wait a bit for the replication to complete this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); @@ -485,6 +498,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // Start CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); // wait a bit for the replication to complete this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); @@ -501,7 +515,8 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest * Checks that batches are correctly constructed when batch boundaries are reached. */ public void doTestBatchBoundaries() throws Exception { - invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); log.info("Indexing documents"); @@ -514,6 +529,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest assertEquals(128, getNumDocs(SOURCE_COLLECTION)); this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); + this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2); commit(TARGET_COLLECTION); @@ -538,6 +554,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // Start CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); // wait a bit for the replication to complete this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1); @@ -577,8 +594,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest // Restart CDCR this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP); - Thread.sleep(500); // wait a bit for the state to synch + this.waitForCdcrStateReplication(SOURCE_COLLECTION); this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START); + this.waitForCdcrStateReplication(SOURCE_COLLECTION); docs.clear(); for (; start < 150; start++) { @@ -596,5 +614,76 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest assertEquals(50, getNumDocs(TARGET_COLLECTION)); } + /** + * Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs + * is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second), + * it will retry until the assert is successful or until the timeout. + */ + protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception { + int cnt = 15; // timeout after 15 seconds + AssertionError lastAssertionError = null; + + while (cnt > 0) { + try { + // Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes + List ids = new ArrayList<>(); + ids.add("_NON_EXISTING_ID_"); + deleteById(collection, ids); + + // Check the update logs + this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs); + return; + } + catch (AssertionError e) { + lastAssertionError = e; + cnt--; + Thread.sleep(1000); + } + } + + throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError); + } + + /** + * Asserts the number of transaction logs across all the shards + */ + private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception { + CollectionInfo info = collectInfo(collection); + Map> shardToCoresMap = info.getShardToCoresMap(); + + int leaderLogs = 0; + ArrayList replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0)); + + for (String shard : shardToCoresMap.keySet()) { + leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir); + for (int i = 0; i < replicationFactor - 1; i++) { + replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir)); + } + } + + for (Integer replicaLogs : replicasLogs) { + log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs); + + // replica logs must be always equal or superior to leader logs + assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.", + replicaLogs, leaderLogs), leaderLogs <= replicaLogs); + + assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.", + leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs); + + assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.", + replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs); + } + } + + private int numberOfFiles(String dir) { + File file = new File(dir); + if (!file.isDirectory()) { + assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false); + } + log.debug("Update log dir {} contains: {}", dir, file.listFiles()); + return file.listFiles().length; + } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java index c6c8631aaf9..ec59ce7af68 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java @@ -23,7 +23,6 @@ package org.apache.solr.cloud; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.common.SolrInputDocument; -import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -32,7 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -@Ignore @Slow public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { @@ -47,7 +45,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { } @Test - @ShardsFixed(num = 4) + @ShardsFixed(num = 2) public void doTest() throws Exception { this.doTestFullReplication(); this.doTestPartialReplication(); @@ -60,6 +58,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { * strategy should fetch all the missing tlog files from the leader. */ public void doTestFullReplication() throws Exception { + this.clearSourceCollection(); List slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1); ChaosMonkey.stop(slaves.get(0).jetty); @@ -76,7 +75,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { // Restart the slave node to trigger Replication strategy this.restartServer(slaves.get(0)); - this.assertUpdateLogs(SOURCE_COLLECTION, 10); + this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10); } /** @@ -85,7 +84,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { */ public void doTestPartialReplication() throws Exception { this.clearSourceCollection(); - for (int i = 0; i < 5; i++) { List docs = new ArrayList<>(); for (int j = i * 20; j < (i * 20) + 20; j++) { @@ -111,7 +109,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { this.restartServer(slaves.get(0)); // at this stage, the slave should have replicated the 5 missing tlog files - this.assertUpdateLogs(SOURCE_COLLECTION, 10); + this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10); } /** @@ -121,7 +119,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { */ public void doTestPartialReplicationWithTruncatedTlog() throws Exception { this.clearSourceCollection(); - CloudSolrClient client = createCloudClient(SOURCE_COLLECTION); List slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1); @@ -148,7 +145,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { this.restartServer(slaves.get(0)); // at this stage, the slave should have replicated the 5 missing tlog files - this.assertUpdateLogs(SOURCE_COLLECTION, 10); + this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10); } /** @@ -159,7 +156,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { */ public void doTestPartialReplicationAfterPeerSync() throws Exception { this.clearSourceCollection(); - for (int i = 0; i < 5; i++) { List docs = new ArrayList<>(); for (int j = i * 10; j < (i * 10) + 10; j++) { @@ -199,7 +195,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { this.restartServer(slaves.get(0)); // at this stage, the slave should have replicated the 5 missing tlog files - this.assertUpdateLogs(SOURCE_COLLECTION, 15); + this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15); } private List getShardToSlaveJetty(String collection, String shard) { @@ -210,10 +206,10 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { } /** - * Asserts that the transaction logs between the leader and slave + * Asserts that the update logs are in sync between the leader and slave. The leader and the slaves + * must have identical tlog files. */ - @Override - protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception { + protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception { CollectionInfo info = collectInfo(collection); Map> shardToCoresMap = info.getShardToCoresMap(); @@ -221,8 +217,8 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest { Map leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir); Map slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir); - assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size()); - assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size()); + assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size()); + assertEquals("Incorrect number of tlog files on the slave", numberOfTLogs, slaveFilesMeta.size()); for (Long leaderFileVersion : leaderFilesMeta.keySet()) { assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion)); diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java index f721512bde5..31338d8690b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java @@ -20,10 +20,8 @@ package org.apache.solr.cloud; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.common.util.NamedList; import org.apache.solr.handler.CdcrParams; -import org.junit.Ignore; import org.junit.Test; -@Ignore @Slow public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest { @@ -35,7 +33,7 @@ public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest { } @Test - @ShardsFixed(num = 4) + @ShardsFixed(num = 2) public void doTest() throws Exception { this.doTestLifeCycleActions(); this.doTestCheckpointActions(); diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java index f3b1e40350e..0d23c961d5d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java @@ -61,8 +61,7 @@ public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest { @Test @ShardsFixed(num = 4) - - public void doTest() throws Exception { + public void testCdcrDocVersions() throws Exception { SolrClient client = createClientRandomly(); try { handle.clear();