mirror of https://github.com/apache/lucene.git
SOLR-6273: Cross Data Center Replication. All tests are now passing on my machine, let's see if Jenkins flushes anything out
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1691606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e92618bbda
commit
e7640f99a0
|
@ -82,6 +82,7 @@ class CdcrReplicatorScheduler {
|
|||
@Override
|
||||
public void run() {
|
||||
CdcrReplicatorState state = statesQueue.poll();
|
||||
assert state != null; // Should never happen
|
||||
try {
|
||||
new CdcrReplicator(state, batchSize).run();
|
||||
} finally {
|
||||
|
|
|
@ -245,11 +245,43 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
return jetty.client.request(request);
|
||||
}
|
||||
|
||||
protected void waitForCdcrStateReplication(String collection) throws Exception {
|
||||
log.info("Wait for CDCR state to replicate - collection: " + collection);
|
||||
|
||||
int cnt = 30;
|
||||
while (cnt > 0) {
|
||||
NamedList status = null;
|
||||
boolean allEquals = true;
|
||||
for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
|
||||
NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
|
||||
if (status == null) {
|
||||
status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
|
||||
continue;
|
||||
}
|
||||
allEquals &= status.equals(rsp.get(CdcrParams.CdcrAction.STATUS.toLower()));
|
||||
}
|
||||
|
||||
if (allEquals) {
|
||||
break;
|
||||
}
|
||||
else {
|
||||
if (cnt == 0) {
|
||||
throw new RuntimeException("Timeout waiting for CDCR state to replicate: collection="+collection);
|
||||
}
|
||||
cnt--;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("CDCR state is identical across nodes - collection: " + collection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert the state of CDCR on each nodes of the given collection.
|
||||
*/
|
||||
protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
|
||||
throws Exception {
|
||||
throws Exception {
|
||||
this.waitForCdcrStateReplication(collection); // ensure that cdcr state is replicated and stable
|
||||
for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
|
||||
NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
|
||||
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
|
||||
|
@ -282,6 +314,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
*/
|
||||
protected void clearSourceCollection() throws Exception {
|
||||
this.deleteCollection(SOURCE_COLLECTION);
|
||||
this.waitForCollectionToDisappear(SOURCE_COLLECTION);
|
||||
this.createCollection(SOURCE_COLLECTION);
|
||||
this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
|
||||
this.updateMappingsFromZk(SOURCE_COLLECTION);
|
||||
|
@ -305,6 +338,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
*/
|
||||
protected void clearTargetCollection() throws Exception {
|
||||
this.deleteCollection(TARGET_COLLECTION);
|
||||
this.waitForCollectionToDisappear(TARGET_COLLECTION);
|
||||
this.createCollection(TARGET_COLLECTION);
|
||||
this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
|
||||
this.updateMappingsFromZk(TARGET_COLLECTION);
|
||||
|
@ -389,7 +423,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
/**
|
||||
* Delete a collection through the Collection API.
|
||||
*/
|
||||
protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException {
|
||||
protected CollectionAdminResponse deleteCollection(String collectionName) throws Exception {
|
||||
SolrClient client = createCloudClient(null);
|
||||
CollectionAdminResponse res;
|
||||
|
||||
|
@ -412,6 +446,17 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
return res;
|
||||
}
|
||||
|
||||
private void waitForCollectionToDisappear(String collection) throws Exception {
|
||||
CloudSolrClient client = this.createCloudClient(null);
|
||||
try {
|
||||
client.connect();
|
||||
ZkStateReader zkStateReader = client.getZkStateReader();
|
||||
AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15);
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
|
||||
CloudSolrClient client = this.createCloudClient(null);
|
||||
try {
|
||||
|
@ -673,15 +718,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
}
|
||||
|
||||
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
|
||||
while (true) {
|
||||
int cnt = 15;
|
||||
while (cnt > 0) {
|
||||
log.info("Checking queue size @ {}:{}", collectionName, shardId);
|
||||
long size = this.getQueueSize(collectionName, shardId);
|
||||
if (size <= 0) {
|
||||
if (size == 0) { // if we received -1, it means that the log reader is not yet initialised, we should wait
|
||||
return;
|
||||
}
|
||||
log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
|
||||
cnt--;
|
||||
Thread.sleep(1000); // wait a bit for the replication to complete
|
||||
}
|
||||
throw new RuntimeException("Timeout waiting for CDCR replication to complete @" + collectionName + ":" + shardId);
|
||||
}
|
||||
|
||||
protected long getQueueSize(String collectionName, String shardId) throws Exception {
|
||||
|
@ -691,47 +739,6 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
return (Long) status.get(CdcrParams.QUEUE_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the number of transaction logs across all the shards
|
||||
*/
|
||||
protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
|
||||
CollectionInfo info = collectInfo(collection);
|
||||
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
|
||||
|
||||
int leaderLogs = 0;
|
||||
ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
|
||||
|
||||
for (String shard : shardToCoresMap.keySet()) {
|
||||
leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
|
||||
for (int i = 0; i < replicationFactor - 1; i++) {
|
||||
replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
|
||||
}
|
||||
}
|
||||
|
||||
for (Integer replicaLogs : replicasLogs) {
|
||||
log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
|
||||
|
||||
// replica logs must be always equal or superior to leader logs
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
|
||||
replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
|
||||
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
|
||||
leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
|
||||
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
|
||||
replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
|
||||
}
|
||||
}
|
||||
|
||||
private int numberOfFiles(String dir) {
|
||||
File file = new File(dir);
|
||||
if (!file.isDirectory()) {
|
||||
assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
|
||||
}
|
||||
log.info("Update log dir {} contains: {}", dir, file.listFiles());
|
||||
return file.listFiles().length;
|
||||
}
|
||||
|
||||
protected CollectionInfo collectInfo(String collection) throws Exception {
|
||||
CollectionInfo info = new CollectionInfo(collection);
|
||||
for (String shard : shardToJetty.get(collection).keySet()) {
|
||||
|
|
|
@ -21,13 +21,15 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
|
|||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
@Ignore
|
||||
@Slow
|
||||
public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
|
||||
|
||||
|
@ -39,7 +41,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void doTest() throws Exception {
|
||||
public void doTests() throws Exception {
|
||||
this.doTestDeleteCreateSourceCollection();
|
||||
this.doTestTargetCollectionNotAvailable();
|
||||
this.doTestReplicationStartStop();
|
||||
|
@ -52,12 +54,13 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.doTestBatchBoundaries();
|
||||
this.doTestResilienceWithDeleteByQueryOnTarget();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the test framework handles properly the creation and deletion of collections and the
|
||||
* restart of servers.
|
||||
*/
|
||||
public void doTestDeleteCreateSourceCollection() throws Exception {
|
||||
this.clearSourceCollection();
|
||||
this.clearTargetCollection();
|
||||
log.info("Indexing documents");
|
||||
|
||||
List<SolrInputDocument> docs = new ArrayList<>();
|
||||
|
@ -153,8 +156,10 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
assertEquals(0, getNumDocs(TARGET_COLLECTION));
|
||||
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
commit(TARGET_COLLECTION);
|
||||
|
||||
|
@ -162,6 +167,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
assertEquals(10, getNumDocs(TARGET_COLLECTION));
|
||||
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
docs.clear();
|
||||
for (; start < 110; start++) {
|
||||
|
@ -176,8 +182,10 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
// with the latest checkpoints
|
||||
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
commit(TARGET_COLLECTION);
|
||||
|
||||
|
@ -196,6 +204,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// send start action to first shard
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
log.info("Indexing 10 documents");
|
||||
|
||||
|
@ -260,6 +269,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// send start action to first shard
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
log.info("Indexing 10 documents");
|
||||
|
||||
|
@ -337,11 +347,14 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// buffering is enabled by default, so disable it
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
for (int i = 0; i < 50; i++) {
|
||||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
|
||||
// will perform a commit for every document and will create one tlog file per commit
|
||||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
|
||||
}
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
|
@ -352,26 +365,23 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// Stop CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
assertEquals(50, getNumDocs(SOURCE_COLLECTION));
|
||||
assertEquals(50, getNumDocs(TARGET_COLLECTION));
|
||||
|
||||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
|
||||
|
||||
// some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
|
||||
assertUpdateLogs(SOURCE_COLLECTION, 50);
|
||||
assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
|
||||
|
||||
for (int i = 50; i < 100; i++) {
|
||||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
|
||||
}
|
||||
|
||||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
|
||||
|
||||
// at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
|
||||
// leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
|
||||
// and 11 on the non-leader)
|
||||
// the non-leader must have synchronised its update log with its leader
|
||||
assertUpdateLogs(SOURCE_COLLECTION, 50);
|
||||
assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -383,9 +393,11 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// buffering is enabled by default, so disable it
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
// Start CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
// Index documents
|
||||
for (int i = 0; i < 200; i++) {
|
||||
|
@ -425,6 +437,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// Start CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
|
@ -485,6 +498,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// Start CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
|
@ -501,7 +515,8 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
* Checks that batches are correctly constructed when batch boundaries are reached.
|
||||
*/
|
||||
public void doTestBatchBoundaries() throws Exception {
|
||||
invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
log.info("Indexing documents");
|
||||
|
||||
|
@ -514,6 +529,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
assertEquals(128, getNumDocs(SOURCE_COLLECTION));
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
commit(TARGET_COLLECTION);
|
||||
|
||||
|
@ -538,6 +554,7 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// Start CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
|
@ -577,8 +594,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
// Restart CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
|
||||
Thread.sleep(500); // wait a bit for the state to synch
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
docs.clear();
|
||||
for (; start < 150; start++) {
|
||||
|
@ -596,5 +614,76 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
assertEquals(50, getNumDocs(TARGET_COLLECTION));
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs
|
||||
* is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second),
|
||||
* it will retry until the assert is successful or until the timeout.
|
||||
*/
|
||||
protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
|
||||
int cnt = 15; // timeout after 15 seconds
|
||||
AssertionError lastAssertionError = null;
|
||||
|
||||
while (cnt > 0) {
|
||||
try {
|
||||
// Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes
|
||||
List<String> ids = new ArrayList<>();
|
||||
ids.add("_NON_EXISTING_ID_");
|
||||
deleteById(collection, ids);
|
||||
|
||||
// Check the update logs
|
||||
this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs);
|
||||
return;
|
||||
}
|
||||
catch (AssertionError e) {
|
||||
lastAssertionError = e;
|
||||
cnt--;
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the number of transaction logs across all the shards
|
||||
*/
|
||||
private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
|
||||
CollectionInfo info = collectInfo(collection);
|
||||
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
|
||||
|
||||
int leaderLogs = 0;
|
||||
ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
|
||||
|
||||
for (String shard : shardToCoresMap.keySet()) {
|
||||
leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
|
||||
for (int i = 0; i < replicationFactor - 1; i++) {
|
||||
replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
|
||||
}
|
||||
}
|
||||
|
||||
for (Integer replicaLogs : replicasLogs) {
|
||||
log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
|
||||
|
||||
// replica logs must be always equal or superior to leader logs
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
|
||||
replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
|
||||
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
|
||||
leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
|
||||
|
||||
assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
|
||||
replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
|
||||
}
|
||||
}
|
||||
|
||||
private int numberOfFiles(String dir) {
|
||||
File file = new File(dir);
|
||||
if (!file.isDirectory()) {
|
||||
assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
|
||||
}
|
||||
log.debug("Update log dir {} contains: {}", dir, file.listFiles());
|
||||
return file.listFiles().length;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ package org.apache.solr.cloud;
|
|||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -32,7 +31,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Ignore
|
||||
@Slow
|
||||
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
||||
|
||||
|
@ -47,7 +45,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
@ShardsFixed(num = 2)
|
||||
public void doTest() throws Exception {
|
||||
this.doTestFullReplication();
|
||||
this.doTestPartialReplication();
|
||||
|
@ -60,6 +58,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
* strategy should fetch all the missing tlog files from the leader.
|
||||
*/
|
||||
public void doTestFullReplication() throws Exception {
|
||||
this.clearSourceCollection();
|
||||
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
|
||||
ChaosMonkey.stop(slaves.get(0).jetty);
|
||||
|
||||
|
@ -76,7 +75,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
// Restart the slave node to trigger Replication strategy
|
||||
this.restartServer(slaves.get(0));
|
||||
|
||||
this.assertUpdateLogs(SOURCE_COLLECTION, 10);
|
||||
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -85,7 +84,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
*/
|
||||
public void doTestPartialReplication() throws Exception {
|
||||
this.clearSourceCollection();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
List<SolrInputDocument> docs = new ArrayList<>();
|
||||
for (int j = i * 20; j < (i * 20) + 20; j++) {
|
||||
|
@ -111,7 +109,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
this.restartServer(slaves.get(0));
|
||||
|
||||
// at this stage, the slave should have replicated the 5 missing tlog files
|
||||
this.assertUpdateLogs(SOURCE_COLLECTION, 10);
|
||||
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -121,7 +119,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
*/
|
||||
public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
|
||||
this.clearSourceCollection();
|
||||
|
||||
CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
|
||||
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
|
||||
|
||||
|
@ -148,7 +145,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
this.restartServer(slaves.get(0));
|
||||
|
||||
// at this stage, the slave should have replicated the 5 missing tlog files
|
||||
this.assertUpdateLogs(SOURCE_COLLECTION, 10);
|
||||
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -159,7 +156,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
*/
|
||||
public void doTestPartialReplicationAfterPeerSync() throws Exception {
|
||||
this.clearSourceCollection();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
List<SolrInputDocument> docs = new ArrayList<>();
|
||||
for (int j = i * 10; j < (i * 10) + 10; j++) {
|
||||
|
@ -199,7 +195,7 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
this.restartServer(slaves.get(0));
|
||||
|
||||
// at this stage, the slave should have replicated the 5 missing tlog files
|
||||
this.assertUpdateLogs(SOURCE_COLLECTION, 15);
|
||||
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
|
||||
}
|
||||
|
||||
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
|
||||
|
@ -210,10 +206,10 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Asserts that the transaction logs between the leader and slave
|
||||
* Asserts that the update logs are in sync between the leader and slave. The leader and the slaves
|
||||
* must have identical tlog files.
|
||||
*/
|
||||
@Override
|
||||
protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
|
||||
protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception {
|
||||
CollectionInfo info = collectInfo(collection);
|
||||
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
|
||||
|
||||
|
@ -221,8 +217,8 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
|
||||
Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
|
||||
|
||||
assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size());
|
||||
assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size());
|
||||
assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size());
|
||||
assertEquals("Incorrect number of tlog files on the slave", numberOfTLogs, slaveFilesMeta.size());
|
||||
|
||||
for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
|
||||
assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));
|
||||
|
|
|
@ -20,10 +20,8 @@ package org.apache.solr.cloud;
|
|||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore
|
||||
@Slow
|
||||
public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
|
||||
|
||||
|
@ -35,7 +33,7 @@ public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
@ShardsFixed(num = 2)
|
||||
public void doTest() throws Exception {
|
||||
this.doTestLifeCycleActions();
|
||||
this.doTestCheckpointActions();
|
||||
|
|
|
@ -61,8 +61,7 @@ public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
|
|||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
|
||||
public void doTest() throws Exception {
|
||||
public void testCdcrDocVersions() throws Exception {
|
||||
SolrClient client = createClientRandomly();
|
||||
try {
|
||||
handle.clear();
|
||||
|
|
Loading…
Reference in New Issue