SOLR-11003: Support bi-directional syncing of cdcr clusters. We still only support actively into one cluster cluster,

but have the ability to switch indexing clusters and cdcr will replicate correctly
This commit is contained in:
Varun Thacker 2017-11-02 16:17:14 -07:00
parent b43dcde267
commit db18de7e6b
16 changed files with 794 additions and 105 deletions

View File

@ -64,6 +64,9 @@ New Features
* SOLR-11438: Solr should return rf when min_rf is specified for deletes as well as adds (Erick Erickson) * SOLR-11438: Solr should return rf when min_rf is specified for deletes as well as adds (Erick Erickson)
* SOLR-11003: Support bi-directional syncing of cdcr clusters. We still only support actively into one cluster cluster,
but have the ability to switch indexing clusters and cdcr will replicate correctly. (Amrit Sarkar, Varun Thacker)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -77,6 +77,10 @@ public class CdcrReplicator implements Runnable {
Object o = subReader.next(); Object o = subReader.next();
if (o == null) break; // we have reached the end of the update logs, we should close the batch if (o == null) break; // we have reached the end of the update logs, we should close the batch
if (isTargetCluster(o)) {
continue;
}
if (isDelete(o)) { if (isDelete(o)) {
/* /*
@ -140,6 +144,30 @@ public class CdcrReplicator implements Runnable {
state.resetConsecutiveErrors(); state.resetConsecutiveErrors();
} }
/** check whether the update read from TLog is received from source
* or received via solr client
*/
private boolean isTargetCluster(Object o) {
List entry = (List) o;
int operationAndFlags = (Integer) entry.get(0);
int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
Boolean isTarget = false;
if (oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE) {
if (entry.size() == 4) { //back-combat - skip for previous versions
isTarget = (Boolean) entry.get(entry.size() - 1);
}
} else if (oper == UpdateLog.UPDATE_INPLACE) {
if (entry.size() == 6) { //back-combat - skip for previous versions
isTarget = (Boolean) entry.get(entry.size() - 2);
}
} else if (oper == UpdateLog.ADD) {
if (entry.size() == 4) { //back-combat - skip for previous versions
isTarget = (Boolean) entry.get(entry.size() - 2);
}
}
return isTarget;
}
private boolean isDelete(Object o) { private boolean isDelete(Object o) {
List entry = (List) o; List entry = (List) o;
int operationAndFlags = (Integer) entry.get(0); int operationAndFlags = (Integer) entry.get(0);

View File

@ -24,10 +24,13 @@ import java.nio.channels.Channels;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Collection; import java.util.Collection;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.FastOutputStream; import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.update.processor.CdcrUpdateProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -104,6 +107,147 @@ public class CdcrTransactionLog extends TransactionLog {
return 0; return 0;
} }
@Override
public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
LogCodec codec = new LogCodec(resolver);
SolrInputDocument sdoc = cmd.getSolrInputDocument();
try {
checkWriteHeader(codec, sdoc);
// adaptive buffer sizing
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
if (cmd.isInPlaceUpdate()) {
codec.writeTag(JavaBinCodec.ARR, 6);
codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeLong(prevPointer);
codec.writeLong(cmd.prevVersion);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
} else {
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
}
lastAddSize = (int)out.size();
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
/***
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
if (pos != fos.size()) {
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
}
***/
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
// TODO: reset our file pointer back to "pos", the start of this record.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
@Override
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
BytesRef br = cmd.getIndexedId();
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeByteArray(br.bytes, br.offset, br.length);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@Override
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@Override @Override
public long writeCommit(CommitUpdateCommand cmd, int flags) { public long writeCommit(CommitUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(resolver); LogCodec codec = new LogCodec(resolver);

View File

@ -326,7 +326,7 @@ public class TransactionLog implements Closeable {
numRecords++; numRecords++;
} }
private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException { protected void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
// Unsynchronized access. We can get away with an unsynchronized access here // Unsynchronized access. We can get away with an unsynchronized access here
// since we will never get a false non-zero when the position is in fact 0. // since we will never get a false non-zero when the position is in fact 0.

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" positionIncrementGap="0"/>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
</fields>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,80 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This is a "kitchen sink" config file that tests can use.
When writting a new test, feel free to add *new* items (plugins,
config options, etc...) as long as they don't break any existing
tests. if you need to test something esoteric please add a new
"solrconfig-your-esoteric-purpose.xml" config file.
Note in particular that this test is used by MinimalSchemaTest so
Anything added to this file needs to work correctly even if there
is now uniqueKey or defaultSearch Field.
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateRequestProcessorChain name="cdcr-processor-chain">
<processor class="solr.CdcrUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory"/>
</updateRequestProcessorChain>
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
<lst name="replica">
<str name="zkHost">${cdcr.cluster2.zkHost}</str>
<str name="source">cdcr-cluster1</str>
<str name="target">cdcr-cluster2</str>
</lst>
<lst name="replicator">
<str name="threadPoolSize">1</str>
<str name="schedule">1000</str>
<str name="batchSize">1000</str>
</lst>
<lst name="updateLogSynchronizer">
<str name="schedule">1000</str>
</lst>
</requestHandler>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog class="solr.CdcrUpdateLog">
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler" />
<initParams path="/update/**,/query,/select,/tvrh,/elevate,/spell,/browse">
<lst name="defaults">
<str name="df">_text_</str>
</lst>
</initParams>
<requestHandler name="/update" class="solr.UpdateRequestHandler">
<lst name="defaults">
<str name="update.chain">cdcr-processor-chain</str>
</lst>
</requestHandler>
</config>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" positionIncrementGap="0"/>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
</fields>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,80 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This is a "kitchen sink" config file that tests can use.
When writting a new test, feel free to add *new* items (plugins,
config options, etc...) as long as they don't break any existing
tests. if you need to test something esoteric please add a new
"solrconfig-your-esoteric-purpose.xml" config file.
Note in particular that this test is used by MinimalSchemaTest so
Anything added to this file needs to work correctly even if there
is now uniqueKey or defaultSearch Field.
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateRequestProcessorChain name="cdcr-processor-chain">
<processor class="solr.CdcrUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory"/>
</updateRequestProcessorChain>
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
<lst name="replica">
<str name="zkHost">${cdcr.cluster1.zkHost}</str>
<str name="source">cdcr-cluster2</str>
<str name="target">cdcr-cluster1</str>
</lst>
<lst name="replicator">
<str name="threadPoolSize">1</str>
<str name="schedule">1000</str>
<str name="batchSize">1000</str>
</lst>
<lst name="updateLogSynchronizer">
<str name="schedule">1000</str>
</lst>
</requestHandler>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog class="solr.CdcrUpdateLog">
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler" />
<initParams path="/update/**,/query,/select,/tvrh,/elevate,/spell,/browse">
<lst name="defaults">
<str name="df">_text_</str>
</lst>
</initParams>
<requestHandler name="/update" class="solr.UpdateRequestHandler">
<lst name="defaults">
<str name="update.chain">cdcr-processor-chain</str>
</lst>
</requestHandler>
</config>

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -40,6 +40,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.cloud.OverseerCollectionMessageHandler;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.CdcrParams;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrBidirectionalTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
public void testBiDir() throws Exception {
MiniSolrCloudCluster cluster2 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster2"), buildJettyConfig("/solr"));
cluster2.waitForAllNodes(30);
MiniSolrCloudCluster cluster1 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster1"), buildJettyConfig("/solr"));
cluster1.waitForAllNodes(30);
try {
log.info("cluster2 zkHost = " + cluster2.getZkServer().getZkAddress());
System.setProperty("cdcr.cluster2.zkHost", cluster2.getZkServer().getZkAddress());
log.info("cluster1 zkHost = " + cluster1.getZkServer().getZkAddress());
System.setProperty("cdcr.cluster1.zkHost", cluster1.getZkServer().getZkAddress());
cluster1.uploadConfigSet(configset("cdcr-cluster1"), "cdcr-cluster1");
CollectionAdminRequest.createCollection("cdcr-cluster1", "cdcr-cluster1", 2, 1)
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
.setMaxShardsPerNode(2)
.process(cluster1.getSolrClient());
CloudSolrClient cluster1SolrClient = cluster1.getSolrClient();
cluster1SolrClient.setDefaultCollection("cdcr-cluster1");
cluster2.uploadConfigSet(configset("cdcr-cluster2"), "cdcr-cluster2");
CollectionAdminRequest.createCollection("cdcr-cluster2", "cdcr-cluster2", 2, 1)
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
.setMaxShardsPerNode(2)
.process(cluster2.getSolrClient());
CloudSolrClient cluster2SolrClient = cluster2.getSolrClient();
cluster2SolrClient.setDefaultCollection("cdcr-cluster2");
UpdateRequest req = null;
CdcrTestsUtil.cdcrStart(cluster1SolrClient);
Thread.sleep(2000);
// ADD operation on cluster 1
int docs = (TEST_NIGHTLY ? 100 : 10);
int numDocs_c1 = 0;
for (int k = 0; k < docs; k++) {
req = new UpdateRequest();
for (; numDocs_c1 < (k + 1) * 100; numDocs_c1++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "cluster1_" + numDocs_c1);
doc.addField("xyz", numDocs_c1);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
log.info("Adding " + docs + " docs with commit=true, numDocs=" + numDocs_c1);
req.process(cluster1SolrClient);
}
QueryResponse response = cluster1SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 1 docs mismatch", numDocs_c1, response.getResults().getNumFound());
assertEquals("cluster 2 docs mismatch", numDocs_c1, CdcrTestsUtil.waitForClusterToSync(numDocs_c1, cluster2SolrClient));
CdcrTestsUtil.cdcrStart(cluster2SolrClient); // FULL BI-DIRECTIONAL CDCR FORWARDING ON
Thread.sleep(2000);
// ADD operation on cluster 2
int numDocs_c2 = 0;
for (int k = 0; k < docs; k++) {
req = new UpdateRequest();
for (; numDocs_c2 < (k + 1) * 100; numDocs_c2++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "cluster2_" + numDocs_c2);
doc.addField("xyz", numDocs_c2);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
log.info("Adding " + docs + " docs with commit=true, numDocs=" + numDocs_c2);
req.process(cluster2SolrClient);
}
int numDocs = numDocs_c1 + numDocs_c2;
response = cluster2SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 2 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 1 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster1SolrClient));
// logging cdcr clusters queue response
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
log.info("Cdcr cluster1 queue response: " + response.getResponse());
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
log.info("Cdcr cluster2 queue response: " + response.getResponse());
// lets find and keep the maximum version assigned by cluster1 & cluster2 across all our updates
long maxVersion_c1 = Math.min((long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster1SolrClient, "shard1", numDocs),
(long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster1SolrClient, "shard2", numDocs));
long maxVersion_c2 = Math.min((long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster2SolrClient, "shard1", numDocs),
(long)CdcrTestsUtil.getFingerPrintMaxVersion(cluster2SolrClient, "shard2", numDocs));
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
params.set(CommonParams.QT, "/cdcr");
response = cluster2SolrClient.query(params);
Long checkpoint_2 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
assertNotNull(checkpoint_2);
params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
params.set(CommonParams.QT, "/cdcr");
response = cluster1SolrClient.query(params);
Long checkpoint_1 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
assertNotNull(checkpoint_1);
log.info("v1: " + maxVersion_c1 + "\t" + "v2: " + maxVersion_c2 + "\t" +
"checkpoint1: " + checkpoint_1 + "\t" + "checkpoint2: " + checkpoint_2);
assertEquals("COLLECTIONCHECKPOINT from cluster2 should have returned the maximum " +
"version across all updates made to cluster1", maxVersion_c1, checkpoint_2.longValue());
assertEquals("COLLECTIONCHECKPOINT from cluster1 should have returned the maximum " +
"version across all updates made to cluster2", maxVersion_c2, checkpoint_1.longValue());
assertEquals("max versions of updates in both clusters should be same", maxVersion_c1, maxVersion_c2);
// DELETE BY QUERY
String deleteByQuery = "id:cluster1_" +String.valueOf(random().nextInt(numDocs_c1));
response = cluster1SolrClient.query(new SolrQuery(deleteByQuery));
assertEquals("should match exactly one doc", 1, response.getResults().getNumFound());
cluster1SolrClient.deleteByQuery(deleteByQuery);
cluster1SolrClient.commit();
numDocs--;
numDocs_c1--;
response = cluster1SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 1 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 2 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster2SolrClient));
// DELETE BY ID
SolrInputDocument doc;
String delete_id_query = "cluster2_" + random().nextInt(numDocs_c2);
cluster2SolrClient.deleteById(delete_id_query);
cluster2SolrClient.commit();
numDocs--;
numDocs_c2--;
response = cluster2SolrClient.query(new SolrQuery("*:*"));
assertEquals("cluster 2 docs mismatch", numDocs, response.getResults().getNumFound());
assertEquals("cluster 1 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster1SolrClient));
// ATOMIC UPDATES
req = new UpdateRequest();
doc = new SolrInputDocument();
ImmutableMap.of("", "");
String atomicUpdateId = "cluster2_" + random().nextInt(numDocs_c2);
doc.addField("id", atomicUpdateId);
doc.addField("xyz", ImmutableMap.of("delete", ""));
doc.addField("abc", ImmutableMap.of("set", "ABC"));
req.add(doc);
req.process(cluster2SolrClient);
cluster2SolrClient.commit();
String atomicQuery = "id:" + atomicUpdateId;
response = cluster2SolrClient.query(new SolrQuery(atomicQuery));
assertEquals("cluster 2 wrong doc", "ABC", response.getResults().get(0).get("abc"));
assertEquals("cluster 1 wrong doc", "ABC", getDocFieldValue(cluster1SolrClient, atomicQuery, "ABC"));
// logging cdcr clusters queue response
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
log.info("Cdcr cluster1 queue response at end of testcase: " + response.getResponse());
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
log.info("Cdcr cluster2 queue response at end of testcase: " + response.getResponse());
CdcrTestsUtil.cdcrStop(cluster1SolrClient);
CdcrTestsUtil.cdcrStop(cluster2SolrClient);
} finally {
if (cluster1 != null) {
cluster1.shutdown();
}
if (cluster2 != null) {
cluster2.shutdown();
}
}
}
private String getDocFieldValue(CloudSolrClient clusterSolrClient, String query, String match) throws Exception {
long start = System.nanoTime();
QueryResponse response = null;
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
clusterSolrClient.commit();
response = clusterSolrClient.query(new SolrQuery(query));
if (match.equals(response.getResults().get(0).get("abc"))) {
break;
}
Thread.sleep(1000);
}
return response != null ? (String) response.getResults().get(0).get("abc") : "";
}
}

View File

@ -15,12 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.List; import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
@ -32,10 +31,11 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams; import org.apache.solr.handler.CdcrParams;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -89,11 +89,9 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/get"); params.set(CommonParams.QT, "/get");
params.set("getVersions", numDocs); params.set("getVersions", numDocs);
params.set("fingerprint", true);
response = sourceSolrClient.query(params); response = sourceSolrClient.query(params);
List<Long> versions = (List<Long>) response.getResponse().get("versions"); maxVersion = (long)(((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered"));
for (Long version : versions) {
maxVersion = Math.max(maxVersion, version);
}
// upload the cdcr-enabled config and restart source cluster // upload the cdcr-enabled config and restart source cluster
source.uploadConfigSet(configset("cdcr-source"), "cdcr-source"); source.uploadConfigSet(configset("cdcr-source"), "cdcr-source");
@ -113,12 +111,12 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
targetSolrClient.setDefaultCollection("cdcr-target"); targetSolrClient.setDefaultCollection("cdcr-target");
Thread.sleep(1000); Thread.sleep(1000);
cdcrStart(targetSolrClient); CdcrTestsUtil.cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient); CdcrTestsUtil.cdcrStart(sourceSolrClient);
response = getCdcrQueue(sourceSolrClient); response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
log.info("Cdcr queue response: " + response.getResponse()); log.info("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient); long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs); assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
@ -189,12 +187,12 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
CloudSolrClient targetSolrClient = target.getSolrClient(); CloudSolrClient targetSolrClient = target.getSolrClient();
targetSolrClient.setDefaultCollection("cdcr-target"); targetSolrClient.setDefaultCollection("cdcr-target");
cdcrStart(targetSolrClient); CdcrTestsUtil.cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient); CdcrTestsUtil.cdcrStart(sourceSolrClient);
response = getCdcrQueue(sourceSolrClient); response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
log.info("Cdcr queue response: " + response.getResponse()); log.info("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient); long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs); assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
int total_tlogs_in_index = FSDirectory.open(target.getBaseDir().resolve("node1"). int total_tlogs_in_index = FSDirectory.open(target.getBaseDir().resolve("node1").
@ -203,8 +201,8 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
assertEquals("tlogs count should be ZERO",0, total_tlogs_in_index); assertEquals("tlogs count should be ZERO",0, total_tlogs_in_index);
cdcrStop(sourceSolrClient); CdcrTestsUtil.cdcrStop(sourceSolrClient);
cdcrDisableBuffer(sourceSolrClient); CdcrTestsUtil.cdcrDisableBuffer(sourceSolrClient);
int c = 0; int c = 0;
for (int k = 0; k < 10; k++) { for (int k = 0; k < 10; k++) {
@ -223,10 +221,10 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
response = sourceSolrClient.query(new SolrQuery("*:*")); response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound()); assertEquals("", numDocs, response.getResults().getNumFound());
cdcrStart(sourceSolrClient); CdcrTestsUtil.cdcrStart(sourceSolrClient);
cdcrEnableBuffer(sourceSolrClient); CdcrTestsUtil.cdcrEnableBuffer(sourceSolrClient);
foundDocs = waitForTargetToSync(numDocs, targetSolrClient); foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs); assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
} finally { } finally {
@ -269,8 +267,8 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
targetSolrClient.setDefaultCollection("cdcr-target"); targetSolrClient.setDefaultCollection("cdcr-target");
Thread.sleep(1000); Thread.sleep(1000);
cdcrStart(targetSolrClient); CdcrTestsUtil.cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient); CdcrTestsUtil.cdcrStart(sourceSolrClient);
int c = 0; int c = 0;
for (int k = 0; k < docs; k++) { for (int k = 0; k < docs; k++) {
UpdateRequest req = new UpdateRequest(); UpdateRequest req = new UpdateRequest();
@ -288,9 +286,9 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
response = sourceSolrClient.query(new SolrQuery("*:*")); response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound()); assertEquals("", numDocs, response.getResults().getNumFound());
response = getCdcrQueue(sourceSolrClient); response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
log.info("Cdcr queue response: " + response.getResponse()); log.info("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient); long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs); assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
} finally { } finally {
@ -301,56 +299,4 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
} }
} }
private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
long start = System.nanoTime();
QueryResponse response = null;
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
try {
targetSolrClient.commit();
response = targetSolrClient.query(new SolrQuery("*:*"));
if (response.getResults().getNumFound() == numDocs) {
break;
}
} catch (Exception e) {
log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
}
Thread.sleep(1000);
}
return response != null ? response.getResults().getNumFound() : 0;
}
private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
}
private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
}
private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, action.toLower());
return client.query(params);
}
private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
return client.query(params);
}
} }

View File

@ -14,7 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase.BadApple; import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.apache.lucene.util.LuceneTestCase.Nightly; import org.apache.lucene.util.LuceneTestCase.Nightly;
@ -25,12 +31,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Nightly @Nightly
@BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10107") @BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10107")
public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest { public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {

View File

@ -14,18 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -39,6 +28,18 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and * This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
* {@link org.apache.solr.handler.IndexFetcher}. * {@link org.apache.solr.handler.IndexFetcher}.

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import org.apache.lucene.util.LuceneTestCase.Nightly; import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrTestsUtil extends SolrTestCaseJ4{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
}
protected static void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
}
protected static void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
protected static void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
protected static QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, action.toLower());
return client.query(params);
}
protected static QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
return client.query(params);
}
protected static Object getFingerPrintMaxVersion(CloudSolrClient client, String shardNames, int numDocs) throws SolrServerException, IOException, InterruptedException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/get");
params.set("fingerprint", true);
params.set("shards", shardNames);
params.set("getVersions", numDocs);
QueryResponse response = null;
long start = System.nanoTime();
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS)) {
response = client.query(params);
if (response.getResponse() != null && response.getResponse().get("fingerprint") != null) {
return (long)((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered");
}
Thread.sleep(200);
}
log.error("maxVersionEncountered not found for client : " + client + "in 20 attempts");
return null;
}
protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient) throws Exception {
return waitForClusterToSync(numDocs, clusterSolrClient, "*:*");
}
protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient, String query) throws Exception {
long start = System.nanoTime();
QueryResponse response = null;
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
clusterSolrClient.commit();
response = clusterSolrClient.query(new SolrQuery(query));
if (response.getResults().getNumFound() == numDocs) {
break;
}
Thread.sleep(1000);
}
return response != null ? response.getResults().getNumFound() : 0;
}
}

View File

@ -14,7 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.cdcr;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
@ -30,11 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest { public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {