mirror of https://github.com/apache/lucene.git
parent
07652a5289
commit
12d9d67dab
|
@ -63,9 +63,6 @@ New Features
|
|||
* SOLR-11520: Implement autoscaling suggestions for cores count violations (noble)
|
||||
|
||||
|
||||
* SOLR-11003: Support bi-directional syncing of cdcr clusters. We still only support actively into one cluster cluster,
|
||||
but have the ability to switch indexing clusters and cdcr will replicate correctly. (Amrit Sarkar, Varun Thacker)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -77,10 +77,6 @@ public class CdcrReplicator implements Runnable {
|
|||
Object o = subReader.next();
|
||||
if (o == null) break; // we have reached the end of the update logs, we should close the batch
|
||||
|
||||
if (isTargetCluster(o)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isDelete(o)) {
|
||||
|
||||
/*
|
||||
|
@ -144,30 +140,6 @@ public class CdcrReplicator implements Runnable {
|
|||
state.resetConsecutiveErrors();
|
||||
}
|
||||
|
||||
/** check whether the update read from TLog is received from source
|
||||
* or received via solr client
|
||||
*/
|
||||
private boolean isTargetCluster(Object o) {
|
||||
List entry = (List) o;
|
||||
int operationAndFlags = (Integer) entry.get(0);
|
||||
int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
|
||||
Boolean isTarget = false;
|
||||
if (oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE) {
|
||||
if (entry.size() == 4) { //back-combat - skip for previous versions
|
||||
isTarget = (Boolean) entry.get(entry.size() - 1);
|
||||
}
|
||||
} else if (oper == UpdateLog.UPDATE_INPLACE) {
|
||||
if (entry.size() == 6) { //back-combat - skip for previous versions
|
||||
isTarget = (Boolean) entry.get(entry.size() - 2);
|
||||
}
|
||||
} else if (oper == UpdateLog.ADD) {
|
||||
if (entry.size() == 4) { //back-combat - skip for previous versions
|
||||
isTarget = (Boolean) entry.get(entry.size() - 2);
|
||||
}
|
||||
}
|
||||
return isTarget;
|
||||
}
|
||||
|
||||
private boolean isDelete(Object o) {
|
||||
List entry = (List) o;
|
||||
int operationAndFlags = (Integer) entry.get(0);
|
||||
|
|
|
@ -24,13 +24,10 @@ import java.nio.channels.Channels;
|
|||
import java.nio.file.Files;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.util.FastOutputStream;
|
||||
import org.apache.solr.common.util.JavaBinCodec;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.update.processor.CdcrUpdateProcessor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -107,147 +104,6 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
|
||||
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
|
||||
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
SolrInputDocument sdoc = cmd.getSolrInputDocument();
|
||||
|
||||
try {
|
||||
checkWriteHeader(codec, sdoc);
|
||||
|
||||
// adaptive buffer sizing
|
||||
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
|
||||
bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
|
||||
|
||||
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
|
||||
codec.init(out);
|
||||
if (cmd.isInPlaceUpdate()) {
|
||||
codec.writeTag(JavaBinCodec.ARR, 6);
|
||||
codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeLong(prevPointer);
|
||||
codec.writeLong(cmd.prevVersion);
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
// if the update is received via cdcr source; add boolean entry
|
||||
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
|
||||
// to forward to its own target cluster
|
||||
codec.writePrimitive(true);
|
||||
} else {
|
||||
codec.writePrimitive(false);
|
||||
}
|
||||
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
|
||||
|
||||
} else {
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
// if the update is received via cdcr source; add extra boolean entry
|
||||
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
|
||||
// to forward to its own target cluster
|
||||
codec.writePrimitive(true);
|
||||
} else {
|
||||
codec.writePrimitive(false);
|
||||
}
|
||||
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
|
||||
}
|
||||
lastAddSize = (int)out.size();
|
||||
|
||||
synchronized (this) {
|
||||
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
|
||||
assert pos != 0;
|
||||
|
||||
/***
|
||||
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
|
||||
if (pos != fos.size()) {
|
||||
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
|
||||
}
|
||||
***/
|
||||
|
||||
out.writeAll(fos);
|
||||
endRecord(pos);
|
||||
// fos.flushBuffer(); // flush later
|
||||
return pos;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
// TODO: reset our file pointer back to "pos", the start of this record.
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
||||
try {
|
||||
checkWriteHeader(codec, null);
|
||||
|
||||
BytesRef br = cmd.getIndexedId();
|
||||
|
||||
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeByteArray(br.bytes, br.offset, br.length);
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
// if the update is received via cdcr source; add extra boolean entry
|
||||
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
|
||||
// to forward to its own target cluster
|
||||
codec.writePrimitive(true);
|
||||
} else {
|
||||
codec.writePrimitive(false);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
|
||||
assert pos != 0;
|
||||
out.writeAll(fos);
|
||||
endRecord(pos);
|
||||
// fos.flushBuffer(); // flush later
|
||||
return pos;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
try {
|
||||
checkWriteHeader(codec, null);
|
||||
|
||||
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeStr(cmd.query);
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
// if the update is received via cdcr source; add extra boolean entry
|
||||
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
|
||||
// to forward to its own target cluster
|
||||
codec.writePrimitive(true);
|
||||
} else {
|
||||
codec.writePrimitive(false);
|
||||
}
|
||||
synchronized (this) {
|
||||
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
|
||||
out.writeAll(fos);
|
||||
endRecord(pos);
|
||||
// fos.flushBuffer(); // flush later
|
||||
return pos;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeCommit(CommitUpdateCommand cmd, int flags) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
|
|
@ -326,7 +326,7 @@ public class TransactionLog implements Closeable {
|
|||
numRecords++;
|
||||
}
|
||||
|
||||
protected void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
|
||||
private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
|
||||
|
||||
// Unsynchronized access. We can get away with an unsynchronized access here
|
||||
// since we will never get a false non-zero when the position is in fact 0.
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="text" class="solr.TextField"/>
|
||||
<fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" positionIncrementGap="0"/>
|
||||
</types>
|
||||
<fields>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<field name="_text_" type="text" indexed="true" stored="true" multiValued="true" />
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<dynamicField name="*" type="text" indexed="true" stored="true"/>
|
||||
</fields>
|
||||
<copyField source="*" dest="_text_"/>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -1,80 +0,0 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- This is a "kitchen sink" config file that tests can use.
|
||||
When writting a new test, feel free to add *new* items (plugins,
|
||||
config options, etc...) as long as they don't break any existing
|
||||
tests. if you need to test something esoteric please add a new
|
||||
"solrconfig-your-esoteric-purpose.xml" config file.
|
||||
|
||||
Note in particular that this test is used by MinimalSchemaTest so
|
||||
Anything added to this file needs to work correctly even if there
|
||||
is now uniqueKey or defaultSearch Field.
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateRequestProcessorChain name="cdcr-processor-chain">
|
||||
<processor class="solr.CdcrUpdateProcessorFactory"/>
|
||||
<processor class="solr.RunUpdateProcessorFactory"/>
|
||||
</updateRequestProcessorChain>
|
||||
|
||||
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
|
||||
<lst name="replica">
|
||||
<str name="zkHost">${cdcr.cluster2.zkHost}</str>
|
||||
<str name="source">cdcr-cluster1</str>
|
||||
<str name="target">cdcr-cluster2</str>
|
||||
</lst>
|
||||
<lst name="replicator">
|
||||
<str name="threadPoolSize">1</str>
|
||||
<str name="schedule">1000</str>
|
||||
<str name="batchSize">1000</str>
|
||||
</lst>
|
||||
<lst name="updateLogSynchronizer">
|
||||
<str name="schedule">1000</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<updateLog class="solr.CdcrUpdateLog">
|
||||
<str name="dir">${solr.ulog.dir:}</str>
|
||||
</updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="/select" class="solr.SearchHandler" />
|
||||
|
||||
<initParams path="/update/**,/query,/select,/tvrh,/elevate,/spell,/browse">
|
||||
<lst name="defaults">
|
||||
<str name="df">_text_</str>
|
||||
</lst>
|
||||
</initParams>
|
||||
|
||||
<requestHandler name="/update" class="solr.UpdateRequestHandler">
|
||||
<lst name="defaults">
|
||||
<str name="update.chain">cdcr-processor-chain</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
</config>
|
|
@ -1,32 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="text" class="solr.TextField"/>
|
||||
<fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" positionIncrementGap="0"/>
|
||||
</types>
|
||||
<fields>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<field name="_text_" type="text" indexed="true" stored="true" multiValued="true"/>
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<dynamicField name="*" type="text" indexed="true" stored="true"/>
|
||||
</fields>
|
||||
<copyField source="*" dest="_text_"/>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -1,80 +0,0 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- This is a "kitchen sink" config file that tests can use.
|
||||
When writting a new test, feel free to add *new* items (plugins,
|
||||
config options, etc...) as long as they don't break any existing
|
||||
tests. if you need to test something esoteric please add a new
|
||||
"solrconfig-your-esoteric-purpose.xml" config file.
|
||||
|
||||
Note in particular that this test is used by MinimalSchemaTest so
|
||||
Anything added to this file needs to work correctly even if there
|
||||
is now uniqueKey or defaultSearch Field.
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateRequestProcessorChain name="cdcr-processor-chain">
|
||||
<processor class="solr.CdcrUpdateProcessorFactory"/>
|
||||
<processor class="solr.RunUpdateProcessorFactory"/>
|
||||
</updateRequestProcessorChain>
|
||||
|
||||
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
|
||||
<lst name="replica">
|
||||
<str name="zkHost">${cdcr.cluster1.zkHost}</str>
|
||||
<str name="source">cdcr-cluster2</str>
|
||||
<str name="target">cdcr-cluster1</str>
|
||||
</lst>
|
||||
<lst name="replicator">
|
||||
<str name="threadPoolSize">1</str>
|
||||
<str name="schedule">1000</str>
|
||||
<str name="batchSize">1000</str>
|
||||
</lst>
|
||||
<lst name="updateLogSynchronizer">
|
||||
<str name="schedule">1000</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<updateLog class="solr.CdcrUpdateLog">
|
||||
<str name="dir">${solr.ulog.dir:}</str>
|
||||
</updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="/select" class="solr.SearchHandler" />
|
||||
|
||||
<initParams path="/update/**,/query,/select,/tvrh,/elevate,/spell,/browse">
|
||||
<lst name="defaults">
|
||||
<str name="df">_text_</str>
|
||||
</lst>
|
||||
</initParams>
|
||||
|
||||
<requestHandler name="/update" class="solr.UpdateRequestHandler">
|
||||
<lst name="defaults">
|
||||
<str name="update.chain">cdcr-processor-chain</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
</config>
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -40,10 +40,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
|||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.cloud.AbstractDistribZkTestBase;
|
||||
import org.apache.solr.cloud.AbstractZkTestCase;
|
||||
import org.apache.solr.cloud.ChaosMonkey;
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
|
@ -15,11 +15,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
|
@ -31,11 +32,10 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
|||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.AbstractDistribZkTestBase;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -89,9 +89,11 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/get");
|
||||
params.set("getVersions", numDocs);
|
||||
params.set("fingerprint", true);
|
||||
response = sourceSolrClient.query(params);
|
||||
maxVersion = (long)(((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered"));
|
||||
List<Long> versions = (List<Long>) response.getResponse().get("versions");
|
||||
for (Long version : versions) {
|
||||
maxVersion = Math.max(maxVersion, version);
|
||||
}
|
||||
|
||||
// upload the cdcr-enabled config and restart source cluster
|
||||
source.uploadConfigSet(configset("cdcr-source"), "cdcr-source");
|
||||
|
@ -111,12 +113,12 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
Thread.sleep(1000);
|
||||
|
||||
CdcrTestsUtil.cdcrStart(targetSolrClient);
|
||||
CdcrTestsUtil.cdcrStart(sourceSolrClient);
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
log.info("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
|
@ -187,12 +189,12 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
CloudSolrClient targetSolrClient = target.getSolrClient();
|
||||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
|
||||
CdcrTestsUtil.cdcrStart(targetSolrClient);
|
||||
CdcrTestsUtil.cdcrStart(sourceSolrClient);
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
log.info("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
int total_tlogs_in_index = FSDirectory.open(target.getBaseDir().resolve("node1").
|
||||
|
@ -201,8 +203,8 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
|
||||
assertEquals("tlogs count should be ZERO",0, total_tlogs_in_index);
|
||||
|
||||
CdcrTestsUtil.cdcrStop(sourceSolrClient);
|
||||
CdcrTestsUtil.cdcrDisableBuffer(sourceSolrClient);
|
||||
cdcrStop(sourceSolrClient);
|
||||
cdcrDisableBuffer(sourceSolrClient);
|
||||
|
||||
int c = 0;
|
||||
for (int k = 0; k < 10; k++) {
|
||||
|
@ -221,10 +223,10 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
CdcrTestsUtil.cdcrStart(sourceSolrClient);
|
||||
CdcrTestsUtil.cdcrEnableBuffer(sourceSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
cdcrEnableBuffer(sourceSolrClient);
|
||||
|
||||
foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
|
||||
foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
} finally {
|
||||
|
@ -267,8 +269,8 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
Thread.sleep(1000);
|
||||
|
||||
CdcrTestsUtil.cdcrStart(targetSolrClient);
|
||||
CdcrTestsUtil.cdcrStart(sourceSolrClient);
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
int c = 0;
|
||||
for (int k = 0; k < docs; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
|
@ -286,9 +288,9 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
log.info("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
} finally {
|
||||
|
@ -299,4 +301,56 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
|
||||
long start = System.nanoTime();
|
||||
QueryResponse response = null;
|
||||
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
targetSolrClient.commit();
|
||||
response = targetSolrClient.query(new SolrQuery("*:*"));
|
||||
if (response.getResults().getNumFound() == numDocs) {
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return response != null ? response.getResults().getNumFound() : 0;
|
||||
}
|
||||
|
||||
|
||||
private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
|
||||
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
|
||||
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
|
||||
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
|
||||
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, action.toLower());
|
||||
return client.query(params);
|
||||
}
|
||||
|
||||
private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
|
||||
return client.query(params);
|
||||
}
|
||||
}
|
|
@ -14,13 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.BadApple;
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
|
@ -31,6 +25,12 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Nightly
|
||||
@BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10107")
|
||||
public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
|
|
@ -14,7 +14,18 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -28,18 +39,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.cloud.ChaosMonkey;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
|
||||
* {@link org.apache.solr.handler.IndexFetcher}.
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.solr.common.util.NamedList;
|
|
@ -14,12 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -35,6 +30,11 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
|
||||
|
|
@ -1,238 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class CdcrBidirectionalTest extends SolrTestCaseJ4 {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Test
|
||||
public void testBiDir() throws Exception {
|
||||
MiniSolrCloudCluster cluster2 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster2"), buildJettyConfig("/solr"));
|
||||
cluster2.waitForAllNodes(30);
|
||||
MiniSolrCloudCluster cluster1 = new MiniSolrCloudCluster(1, createTempDir("cdcr-cluster1"), buildJettyConfig("/solr"));
|
||||
cluster1.waitForAllNodes(30);
|
||||
try {
|
||||
log.info("cluster2 zkHost = " + cluster2.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.cluster2.zkHost", cluster2.getZkServer().getZkAddress());
|
||||
|
||||
log.info("cluster1 zkHost = " + cluster1.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.cluster1.zkHost", cluster1.getZkServer().getZkAddress());
|
||||
|
||||
|
||||
cluster1.uploadConfigSet(configset("cdcr-cluster1"), "cdcr-cluster1");
|
||||
CollectionAdminRequest.createCollection("cdcr-cluster1", "cdcr-cluster1", 1, 1)
|
||||
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
|
||||
.process(cluster1.getSolrClient());
|
||||
CloudSolrClient cluster1SolrClient = cluster1.getSolrClient();
|
||||
cluster1SolrClient.setDefaultCollection("cdcr-cluster1");
|
||||
|
||||
cluster2.uploadConfigSet(configset("cdcr-cluster2"), "cdcr-cluster2");
|
||||
CollectionAdminRequest.createCollection("cdcr-cluster2", "cdcr-cluster2", 1, 1)
|
||||
.process(cluster2.getSolrClient());
|
||||
CloudSolrClient cluster2SolrClient = cluster2.getSolrClient();
|
||||
cluster2SolrClient.setDefaultCollection("cdcr-cluster2");
|
||||
|
||||
UpdateRequest req;
|
||||
|
||||
CdcrTestsUtil.cdcrStart(cluster1SolrClient);
|
||||
|
||||
// ADD operation on cluster 1
|
||||
int docs = (TEST_NIGHTLY ? 100 : 10);
|
||||
int numDocs_c1 = 0;
|
||||
for (int k = 0; k < docs; k++) {
|
||||
req = new UpdateRequest();
|
||||
for (; numDocs_c1 < (k + 1) * 100; numDocs_c1++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "cluster1_" + numDocs_c1);
|
||||
doc.addField("xyz", numDocs_c1);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
log.info("Adding " + docs + " docs with commit=true, numDocs=" + numDocs_c1);
|
||||
req.process(cluster1SolrClient);
|
||||
}
|
||||
|
||||
QueryResponse response = cluster1SolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("cluster 1 docs mismatch", numDocs_c1, response.getResults().getNumFound());
|
||||
|
||||
assertEquals("cluster 2 docs mismatch", numDocs_c1, CdcrTestsUtil.waitForClusterToSync(numDocs_c1, cluster2SolrClient));
|
||||
|
||||
CdcrTestsUtil.cdcrStart(cluster2SolrClient); // FULL BI-DIRECTIONAL CDCR FORWARDING ON
|
||||
|
||||
// ADD operation on cluster 2
|
||||
int numDocs_c2 = 0;
|
||||
for (int k = 0; k < docs; k++) {
|
||||
req = new UpdateRequest();
|
||||
for (; numDocs_c2 < (k + 1) * 100; numDocs_c2++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "cluster2_" + numDocs_c2);
|
||||
doc.addField("xyz", numDocs_c2);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
log.info("Adding " + docs + " docs with commit=true, numDocs=" + numDocs_c2);
|
||||
req.process(cluster2SolrClient);
|
||||
}
|
||||
|
||||
int numDocs = numDocs_c1 + numDocs_c2;
|
||||
|
||||
response = cluster2SolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("cluster 2 docs mismatch", numDocs, response.getResults().getNumFound());
|
||||
|
||||
assertEquals("cluster 1 docs mismatch", numDocs, CdcrTestsUtil.waitForClusterToSync(numDocs, cluster1SolrClient));
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
|
||||
log.info("Cdcr cluster1 queue response: " + response.getResponse());
|
||||
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
|
||||
log.info("Cdcr cluster2 queue response: " + response.getResponse());
|
||||
|
||||
// lets find and keep the maximum version assigned by cluster1 & cluster2 across all our updates
|
||||
long maxVersion_c1;
|
||||
long maxVersion_c2;
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/get");
|
||||
params.set("getVersions", numDocs);
|
||||
params.set("fingerprint", true);
|
||||
response = cluster1SolrClient.query(params);
|
||||
maxVersion_c1 = (long)(((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered"));
|
||||
response = cluster2SolrClient.query(params);
|
||||
maxVersion_c2 = (long)(((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered"));
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
response = cluster2SolrClient.query(params);
|
||||
Long checkpoint_2 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
|
||||
assertNotNull(checkpoint_2);
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
response = cluster1SolrClient.query(params);
|
||||
Long checkpoint_1 = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
|
||||
assertNotNull(checkpoint_1);
|
||||
|
||||
assertEquals("COLLECTIONCHECKPOINT from cluster2 should have returned the maximum " +
|
||||
"version across all updates made to cluster1", maxVersion_c1, checkpoint_2.longValue());
|
||||
assertEquals("COLLECTIONCHECKPOINT from cluster1 should have returned the maximum " +
|
||||
"version across all updates made to cluster2", maxVersion_c2, checkpoint_1.longValue());
|
||||
assertEquals("max versions of updates in both clusters should be same", maxVersion_c1, maxVersion_c2);
|
||||
|
||||
// DELETE BY QUERY
|
||||
String randomQuery = String.valueOf(ThreadLocalRandom.current().nextInt(docs * 100));
|
||||
response = cluster1SolrClient.query(new SolrQuery(randomQuery));
|
||||
assertEquals("cluster 1 docs mismatch", 2, response.getResults().getNumFound());
|
||||
cluster1SolrClient.deleteByQuery(randomQuery);
|
||||
|
||||
req = new UpdateRequest();
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
req.process(cluster1SolrClient);
|
||||
response = cluster1SolrClient.query(new SolrQuery(randomQuery));
|
||||
assertEquals("cluster 1 docs mismatch", 0, response.getResults().getNumFound());
|
||||
assertEquals("cluster 2 docs mismatch", 0, CdcrTestsUtil.waitForClusterToSync((int)response.getResults().getNumFound(), cluster2SolrClient, randomQuery));
|
||||
|
||||
// ADD the deleted query-doc again.
|
||||
req = new UpdateRequest();
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "cluster2_" + (docs * 100 + 1));
|
||||
doc.addField("xyz", randomQuery);
|
||||
req.add(doc);
|
||||
req.process(cluster2SolrClient);
|
||||
req = new UpdateRequest();
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
req.process(cluster2SolrClient);
|
||||
response = cluster2SolrClient.query(new SolrQuery(randomQuery));
|
||||
assertEquals("cluster 2 docs mismatch", 1, response.getResults().getNumFound());
|
||||
assertEquals("cluster 1 docs mismatch", 1, CdcrTestsUtil.waitForClusterToSync((int)response.getResults().getNumFound(), cluster1SolrClient, randomQuery));
|
||||
|
||||
// DELETE BY ID
|
||||
String delete_id_query = "cluster2_" + (docs * 100 + 1);
|
||||
cluster1SolrClient.deleteById(delete_id_query);
|
||||
req.process(cluster1SolrClient);
|
||||
response = cluster1SolrClient.query(new SolrQuery(delete_id_query));
|
||||
assertEquals("cluster 1 docs mismatch", 0, response.getResults().getNumFound());
|
||||
assertEquals("cluster 2 docs mismatch", 0, CdcrTestsUtil.waitForClusterToSync((int)response.getResults().getNumFound(), cluster2SolrClient, delete_id_query));
|
||||
|
||||
// ATOMIC UPDATES
|
||||
req = new UpdateRequest();
|
||||
doc = new SolrInputDocument();
|
||||
ImmutableMap.of("", "");
|
||||
doc.addField("id", "cluster2_" + (docs * 100 + 1));
|
||||
doc.addField("xyz", ImmutableMap.of("delete", ""));
|
||||
doc.addField("abc", ImmutableMap.of("set", "ABC"));
|
||||
req.add(doc);
|
||||
req.process(cluster2SolrClient);
|
||||
req = new UpdateRequest();
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
req.process(cluster2SolrClient);
|
||||
|
||||
String atomic_query_1 = "xyz:" + randomQuery + " AND id:cluster2_" + (docs * 100 + 1);
|
||||
String atomic_query_2 = "abc:ABC AND id:cluster2_" + (docs * 100 + 1);
|
||||
|
||||
response = cluster2SolrClient.query(new SolrQuery(atomic_query_1));
|
||||
assertEquals("cluster 2 docs mismatch", 0, response.getResults().getNumFound());
|
||||
response = cluster2SolrClient.query(new SolrQuery(atomic_query_2));
|
||||
assertEquals("cluster 2 docs mismatch", 1, response.getResults().getNumFound());
|
||||
assertEquals("cluster 1 docs mismatch", 0, CdcrTestsUtil.waitForClusterToSync((int)response.getResults().getNumFound(), cluster1SolrClient, atomic_query_1));
|
||||
assertEquals("cluster 1 docs mismatch", 1, CdcrTestsUtil.waitForClusterToSync((int)response.getResults().getNumFound(), cluster1SolrClient, atomic_query_2));
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(cluster1SolrClient);
|
||||
log.info("Cdcr cluster1 queue response at end of testcase: " + response.getResponse());
|
||||
response = CdcrTestsUtil.getCdcrQueue(cluster2SolrClient);
|
||||
log.info("Cdcr cluster2 queue response at end of testcase: " + response.getResponse());
|
||||
|
||||
CdcrTestsUtil.cdcrStop(cluster1SolrClient);
|
||||
CdcrTestsUtil.cdcrStop(cluster2SolrClient);
|
||||
|
||||
cluster1SolrClient.close();
|
||||
cluster2SolrClient.close();
|
||||
|
||||
} finally {
|
||||
if (cluster1 != null) {
|
||||
cluster1.shutdown();
|
||||
}
|
||||
if (cluster2 != null) {
|
||||
cluster2.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.cdcr;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class CdcrTestsUtil extends SolrTestCaseJ4{
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
|
||||
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
public static void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
|
||||
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
public static void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
|
||||
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
public static void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
|
||||
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
public static QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, action.toLower());
|
||||
return client.query(params);
|
||||
}
|
||||
|
||||
public static QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
|
||||
return client.query(params);
|
||||
}
|
||||
|
||||
public static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient) throws SolrServerException, IOException, InterruptedException {
|
||||
return waitForClusterToSync(numDocs, clusterSolrClient, "*:*");
|
||||
}
|
||||
|
||||
public static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient, String query) throws SolrServerException, IOException, InterruptedException {
|
||||
long start = System.nanoTime();
|
||||
QueryResponse response = null;
|
||||
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
clusterSolrClient.commit();
|
||||
response = clusterSolrClient.query(new SolrQuery(query));
|
||||
if (response.getResults().getNumFound() == numDocs) {
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception trying to commit on cluster. This is expected and safe to ignore.", e);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return response != null ? response.getResults().getNumFound() : 0;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue