mirror of https://github.com/apache/lucene.git
SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient
This commit is contained in:
parent
bfee2292a3
commit
153c270045
|
@ -116,6 +116,9 @@ New Features
|
|||
|
||||
* SOLR-9252: Feature selection and logistic regression on text (Cao Manh Dat, Joel Bernstein)
|
||||
|
||||
* SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient.
|
||||
(Noble Paul, Renaud Delbru, shalin)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -121,6 +121,11 @@ public class CdcrParams {
|
|||
*/
|
||||
public final static String COUNTER_DELETES = "deletes";
|
||||
|
||||
/**
|
||||
* Counter for Bootstrap operations *
|
||||
*/
|
||||
public final static String COUNTER_BOOTSTRAP = "bootstraps";
|
||||
|
||||
/**
|
||||
* A list of errors per target collection *
|
||||
*/
|
||||
|
@ -165,7 +170,10 @@ public class CdcrParams {
|
|||
LASTPROCESSEDVERSION,
|
||||
QUEUES,
|
||||
OPS,
|
||||
ERRORS;
|
||||
ERRORS,
|
||||
BOOTSTRAP,
|
||||
BOOTSTRAP_STATUS,
|
||||
CANCEL_BOOTSTRAP;
|
||||
|
||||
public static CdcrAction get(String p) {
|
||||
if (p != null) {
|
||||
|
|
|
@ -119,7 +119,7 @@ public class CdcrReplicator implements Runnable {
|
|||
// we might have read a single commit operation and reached the end of the update logs
|
||||
logReader.forwardSeek(subReader);
|
||||
|
||||
log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
|
||||
log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
|
||||
} catch (Exception e) {
|
||||
// report error and update error stats
|
||||
this.handleException(e);
|
||||
|
@ -150,13 +150,13 @@ public class CdcrReplicator implements Runnable {
|
|||
if (e instanceof CdcrReplicatorException) {
|
||||
UpdateRequest req = ((CdcrReplicatorException) e).req;
|
||||
UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
|
||||
log.warn("Failed to forward update request {}. Got response {}", req, rsp);
|
||||
log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
|
||||
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
|
||||
} else if (e instanceof CloudSolrClient.RouteException) {
|
||||
log.warn("Failed to forward update request", e);
|
||||
log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
|
||||
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
|
||||
} else {
|
||||
log.warn("Failed to forward update request", e);
|
||||
log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
|
||||
state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,29 +16,49 @@
|
|||
*/
|
||||
package org.apache.solr.handler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.update.CdcrUpdateLog;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
|
||||
|
||||
class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
||||
|
||||
private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
|
||||
private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
|
||||
// 6 hours is hopefully long enough for most indexes
|
||||
private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
|
||||
|
||||
private List<CdcrReplicatorState> replicatorStates;
|
||||
|
||||
private final CdcrReplicatorScheduler scheduler;
|
||||
|
@ -48,6 +68,9 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
private SolrCore core;
|
||||
private String path;
|
||||
|
||||
private ExecutorService bootstrapExecutor;
|
||||
private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
CdcrReplicatorManager(final SolrCore core, String path,
|
||||
|
@ -104,12 +127,20 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
@Override
|
||||
public synchronized void stateUpdate() {
|
||||
if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
|
||||
if (replicatorStates.size() > 0) {
|
||||
this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
|
||||
new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
|
||||
}
|
||||
this.initLogReaders();
|
||||
this.scheduler.start();
|
||||
return;
|
||||
}
|
||||
|
||||
this.scheduler.shutdown();
|
||||
if (bootstrapExecutor != null) {
|
||||
IOUtils.closeQuietly(bootstrapStatusRunnable);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
|
||||
}
|
||||
this.closeLogReaders();
|
||||
}
|
||||
|
||||
|
@ -117,7 +148,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
return replicatorStates;
|
||||
}
|
||||
|
||||
void initLogReaders() {
|
||||
private void initLogReaders() {
|
||||
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
|
||||
|
@ -129,8 +160,23 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
|
||||
checkpoint, collectionName, shard);
|
||||
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
|
||||
reader.seek(checkpoint);
|
||||
boolean seek = reader.seek(checkpoint);
|
||||
state.init(reader);
|
||||
if (!seek) {
|
||||
// targetVersion is lower than the oldest known entry.
|
||||
// In this scenario, it probably means that there is a gap in the updates log.
|
||||
// the best we can do here is to bootstrap the target leader by replicating the full index
|
||||
final String targetCollection = state.getTargetCollection();
|
||||
state.setBootstrapInProgress(true);
|
||||
log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
|
||||
bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
|
||||
log.info("Submitting bootstrap task to executor");
|
||||
try {
|
||||
bootstrapExecutor.submit(bootstrapStatusRunnable);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to submit bootstrap call to executor", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException | SolrServerException | SolrException e) {
|
||||
log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -164,11 +210,203 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
*/
|
||||
void shutdown() {
|
||||
this.scheduler.shutdown();
|
||||
if (bootstrapExecutor != null) {
|
||||
IOUtils.closeQuietly(bootstrapStatusRunnable);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
|
||||
}
|
||||
for (CdcrReplicatorState state : replicatorStates) {
|
||||
state.shutdown();
|
||||
}
|
||||
replicatorStates.clear();
|
||||
}
|
||||
|
||||
private class BootstrapStatusRunnable implements Runnable, Closeable {
|
||||
private final CdcrReplicatorState state;
|
||||
private final String targetCollection;
|
||||
private final String shard;
|
||||
private final String collectionName;
|
||||
private final CdcrUpdateLog ulog;
|
||||
private final String myCoreUrl;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
|
||||
this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
|
||||
this.state = state;
|
||||
this.targetCollection = state.getTargetCollection();
|
||||
String baseUrl = core.getCoreDescriptor().getCoreContainer().getZkController().getBaseUrl();
|
||||
this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
closed = true;
|
||||
try {
|
||||
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
|
||||
String leaderCoreUrl = leader.getCoreUrl();
|
||||
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
|
||||
sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
|
||||
} catch (SolrServerException e) {
|
||||
log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
|
||||
targetCollection, shard, leaderCoreUrl);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Interrupted while closing BootstrapStatusRunnable", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int retries = 1;
|
||||
boolean success = false;
|
||||
try {
|
||||
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
|
||||
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
|
||||
}
|
||||
TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
if (closed) {
|
||||
log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
|
||||
state.setBootstrapInProgress(false);
|
||||
break;
|
||||
}
|
||||
BootstrapStatus status = getBoostrapStatus();
|
||||
if (status == BootstrapStatus.RUNNING) {
|
||||
try {
|
||||
log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
|
||||
BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
|
||||
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} else if (status == BootstrapStatus.COMPLETED) {
|
||||
log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
|
||||
long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
|
||||
log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
|
||||
checkpoint, collectionName, shard);
|
||||
CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
|
||||
reader1.seek(checkpoint);
|
||||
success = true;
|
||||
break;
|
||||
} else if (status == BootstrapStatus.FAILED) {
|
||||
log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
|
||||
// let's retry a fixed number of times before giving up
|
||||
if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
|
||||
log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
|
||||
break;
|
||||
} else {
|
||||
log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
|
||||
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
|
||||
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
|
||||
}
|
||||
timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); // reset the timer
|
||||
retries++;
|
||||
}
|
||||
} else if (status == BootstrapStatus.NOTFOUND) {
|
||||
// the leader of the target shard may have changed and therefore there is no record of the
|
||||
// bootstrap process so we must retry the operation
|
||||
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
|
||||
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
|
||||
}
|
||||
retries = 1;
|
||||
timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS); // reset the timer
|
||||
} else if (status == BootstrapStatus.UNKNOWN) {
|
||||
// we were not able to query the status on the remote end
|
||||
// so just sleep for a bit and try again
|
||||
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.info("Bootstrap thread interrupted");
|
||||
state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (IOException | SolrServerException | SolrException e) {
|
||||
log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
|
||||
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
|
||||
} finally {
|
||||
if (success) {
|
||||
log.info("Bootstrap successful, giving the go-ahead to replicator");
|
||||
state.setBootstrapInProgress(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
|
||||
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
|
||||
String leaderCoreUrl = leader.getCoreUrl();
|
||||
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
|
||||
log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
|
||||
try {
|
||||
NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
|
||||
log.debug("CDCR Bootstrap response: {}", response);
|
||||
String status = response.get(RESPONSE_STATUS).toString();
|
||||
return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception submitting bootstrap request", e);
|
||||
return BootstrapStatus.UNKNOWN;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("There shouldn't be an IOException while closing but there was!", e);
|
||||
}
|
||||
return BootstrapStatus.UNKNOWN;
|
||||
}
|
||||
|
||||
private BootstrapStatus getBoostrapStatus() throws InterruptedException {
|
||||
try {
|
||||
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
|
||||
String leaderCoreUrl = leader.getCoreUrl();
|
||||
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
|
||||
NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
|
||||
String status = (String) response.get(RESPONSE_STATUS);
|
||||
BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
|
||||
if (bootstrapStatus == BootstrapStatus.RUNNING) {
|
||||
return BootstrapStatus.RUNNING;
|
||||
} else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
|
||||
return BootstrapStatus.COMPLETED;
|
||||
} else if (bootstrapStatus == BootstrapStatus.FAILED) {
|
||||
return BootstrapStatus.FAILED;
|
||||
} else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
|
||||
log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
|
||||
return BootstrapStatus.NOTFOUND;
|
||||
} else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
|
||||
return BootstrapStatus.CANCELLED;
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during bootstrap status request", e);
|
||||
return BootstrapStatus.UNKNOWN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams solrParams = new ModifiableSolrParams();
|
||||
solrParams.set(CommonParams.QT, "/cdcr");
|
||||
solrParams.set(CommonParams.ACTION, action.toString());
|
||||
for (int i = 0; i < params.length - 1; i+=2) {
|
||||
solrParams.set(params[i], params[i + 1]);
|
||||
}
|
||||
SolrRequest request = new QueryRequest(solrParams);
|
||||
return client.request(request);
|
||||
}
|
||||
|
||||
private enum BootstrapStatus {
|
||||
SUBMITTED,
|
||||
RUNNING,
|
||||
COMPLETED,
|
||||
FAILED,
|
||||
NOTFOUND,
|
||||
CANCELLED,
|
||||
UNKNOWN
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,11 @@ class CdcrReplicatorScheduler {
|
|||
CdcrReplicatorState state = statesQueue.poll();
|
||||
assert state != null; // Should never happen
|
||||
try {
|
||||
new CdcrReplicator(state, batchSize).run();
|
||||
if (!state.isBootstrapInProgress()) {
|
||||
new CdcrReplicator(state, batchSize).run();
|
||||
} else {
|
||||
log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
|
||||
}
|
||||
} finally {
|
||||
statesQueue.offer(state);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.update.CdcrUpdateLog;
|
||||
|
@ -53,6 +55,9 @@ class CdcrReplicatorState {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
|
||||
private final AtomicInteger numBootstraps = new AtomicInteger();
|
||||
|
||||
CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
|
||||
this.targetCollection = targetCollection;
|
||||
this.targetClient = targetClient;
|
||||
|
@ -164,6 +169,24 @@ class CdcrReplicatorState {
|
|||
return this.benchmarkTimer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if a bootstrap operation is in progress, false otherwise
|
||||
*/
|
||||
boolean isBootstrapInProgress() {
|
||||
return bootstrapInProgress.get();
|
||||
}
|
||||
|
||||
void setBootstrapInProgress(boolean inProgress) {
|
||||
if (bootstrapInProgress.compareAndSet(true, false)) {
|
||||
numBootstraps.incrementAndGet();
|
||||
}
|
||||
bootstrapInProgress.set(inProgress);
|
||||
}
|
||||
|
||||
public int getNumBootstraps() {
|
||||
return numBootstraps.get();
|
||||
}
|
||||
|
||||
enum ErrorType {
|
||||
INTERNAL,
|
||||
BAD_REQUEST;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.handler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
|
@ -24,14 +25,20 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -41,21 +48,33 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
|||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
import org.apache.solr.core.PluginBag;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.request.SolrRequestHandler;
|
||||
import org.apache.solr.request.SolrRequestInfo;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.CdcrUpdateLog;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.VersionInfo;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This request handler implements the CDCR API and is responsible of the execution of the
|
||||
|
@ -199,6 +218,18 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
this.handleErrorsAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
case BOOTSTRAP: {
|
||||
this.handleBootstrapAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
case BOOTSTRAP_STATUS: {
|
||||
this.handleBootstrapStatus(req, rsp);
|
||||
break;
|
||||
}
|
||||
case CANCEL_BOOTSTRAP: {
|
||||
this.handleCancelBootstrap(req, rsp);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new RuntimeException("Unknown action: " + action);
|
||||
}
|
||||
|
@ -409,10 +440,20 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
|
||||
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||
VersionInfo versionInfo = ulog.getVersionInfo();
|
||||
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
|
||||
List<Long> versions = recentUpdates.getVersions(1);
|
||||
long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
|
||||
rsp.add(CdcrParams.CHECKPOINT, lastVersion);
|
||||
long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
|
||||
long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
|
||||
log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
|
||||
// there is no race with ongoing bootstrap because we don't expect any updates to come from the source
|
||||
long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
|
||||
if (maxVersion == 0L) {
|
||||
maxVersion = -1;
|
||||
}
|
||||
rsp.add(CdcrParams.CHECKPOINT, maxVersion);
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
|
||||
"' could not read max version");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -574,6 +615,192 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
rsp.add(CdcrParams.ERRORS, hosts);
|
||||
}
|
||||
|
||||
private AtomicBoolean running = new AtomicBoolean();
|
||||
private volatile Future<Boolean> bootstrapFuture;
|
||||
private volatile BootstrapCallable bootstrapCallable;
|
||||
|
||||
private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
|
||||
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
if (!leaderStateManager.amILeader()) {
|
||||
log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
|
||||
" sent to non-leader replica");
|
||||
}
|
||||
|
||||
Runnable runnable = () -> {
|
||||
Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
|
||||
boolean locked = recoveryLock.tryLock();
|
||||
try {
|
||||
if (!locked) {
|
||||
handleCancelBootstrap(req, rsp);
|
||||
} else if (leaderStateManager.amILeader()) {
|
||||
running.set(true);
|
||||
String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
|
||||
bootstrapCallable = new BootstrapCallable(masterUrl, core);
|
||||
bootstrapFuture = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getRecoveryExecutor().submit(bootstrapCallable);
|
||||
try {
|
||||
bootstrapFuture.get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("Bootstrap was interrupted", e);
|
||||
} catch (ExecutionException e) {
|
||||
log.error("Bootstrap operation failed", e);
|
||||
}
|
||||
} else {
|
||||
log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
|
||||
}
|
||||
} finally {
|
||||
if (locked) {
|
||||
running.set(false);
|
||||
recoveryLock.unlock();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
|
||||
rsp.add(RESPONSE_STATUS, "submitted");
|
||||
} catch (RejectedExecutionException ree) {
|
||||
// no problem, we're probably shutting down
|
||||
rsp.add(RESPONSE_STATUS, "failed");
|
||||
}
|
||||
}
|
||||
|
||||
private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
|
||||
BootstrapCallable callable = this.bootstrapCallable;
|
||||
IOUtils.closeQuietly(callable);
|
||||
rsp.add(RESPONSE_STATUS, "cancelled");
|
||||
}
|
||||
|
||||
private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
|
||||
if (running.get()) {
|
||||
rsp.add(RESPONSE_STATUS, RUNNING);
|
||||
return;
|
||||
}
|
||||
|
||||
Future<Boolean> future = bootstrapFuture;
|
||||
BootstrapCallable callable = this.bootstrapCallable;
|
||||
if (future == null) {
|
||||
rsp.add(RESPONSE_STATUS, "notfound");
|
||||
rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
|
||||
} else if (future.isCancelled() || callable.isClosed()) {
|
||||
rsp.add(RESPONSE_STATUS, "cancelled");
|
||||
} else if (future.isDone()) {
|
||||
// could be a normal termination or an exception
|
||||
try {
|
||||
Boolean result = future.get();
|
||||
if (result) {
|
||||
rsp.add(RESPONSE_STATUS, COMPLETED);
|
||||
} else {
|
||||
rsp.add(RESPONSE_STATUS, FAILED);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// should not happen?
|
||||
} catch (ExecutionException e) {
|
||||
rsp.add(RESPONSE_STATUS, FAILED);
|
||||
rsp.add(RESPONSE, e);
|
||||
} catch (CancellationException ce) {
|
||||
rsp.add(RESPONSE_STATUS, FAILED);
|
||||
rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
|
||||
}
|
||||
} else {
|
||||
rsp.add(RESPONSE_STATUS, RUNNING);
|
||||
}
|
||||
}
|
||||
|
||||
private static class BootstrapCallable implements Callable<Boolean>, Closeable {
|
||||
private final String masterUrl;
|
||||
private final SolrCore core;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
BootstrapCallable(String masterUrl, SolrCore core) {
|
||||
this.masterUrl = masterUrl;
|
||||
this.core = core;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
closed = true;
|
||||
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
|
||||
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
|
||||
replicationHandler.abortFetch();
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
boolean success = false;
|
||||
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||
// we start buffering updates as a safeguard however we do not expect
|
||||
// to receive any updates from the source during bootstrap
|
||||
ulog.bufferUpdates();
|
||||
try {
|
||||
commitOnLeader(masterUrl);
|
||||
// use rep handler directly, so we can do this sync rather than async
|
||||
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
|
||||
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
|
||||
|
||||
if (replicationHandler == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
|
||||
}
|
||||
|
||||
ModifiableSolrParams solrParams = new ModifiableSolrParams();
|
||||
solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
|
||||
// we do not want the raw tlog files from the source
|
||||
solrParams.set(ReplicationHandler.TLOG_FILES, false);
|
||||
|
||||
success = replicationHandler.doFetch(solrParams, false);
|
||||
|
||||
// this is required because this callable can race with HttpSolrCall#destroy
|
||||
// which clears the request info.
|
||||
// Applying buffered updates fails without the following line because LogReplayer
|
||||
// also tries to set request info and fails with AssertionError
|
||||
SolrRequestInfo.clearRequestInfo();
|
||||
|
||||
Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
|
||||
if (future == null) {
|
||||
// no replay needed
|
||||
log.info("No replay needed.");
|
||||
} else {
|
||||
log.info("Replaying buffered documents.");
|
||||
// wait for replay
|
||||
UpdateLog.RecoveryInfo report = future.get();
|
||||
if (report.failed) {
|
||||
SolrException.log(log, "Replay failed");
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
|
||||
}
|
||||
}
|
||||
return success;
|
||||
} finally {
|
||||
if (closed || !success) {
|
||||
// we cannot apply the buffer in this case because it will introduce newer versions in the
|
||||
// update log and then the source cluster will get those versions via collectioncheckpoint
|
||||
// causing the versions in between to be completely missed
|
||||
boolean dropped = ulog.dropBufferedUpdates();
|
||||
assert dropped;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void commitOnLeader(String leaderUrl) throws SolrServerException,
|
||||
IOException {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
|
||||
client.setConnectionTimeout(30000);
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.setParams(new ModifiableSolrParams());
|
||||
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
|
||||
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
|
||||
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
|
||||
client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Manage Cross Data Center Replication";
|
||||
|
|
|
@ -750,14 +750,14 @@ public class IndexFetcher {
|
|||
}
|
||||
|
||||
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
|
||||
new ModifiableSolrParams());
|
||||
|
||||
RefCounted<SolrIndexSearcher> searcher = null;
|
||||
IndexCommit commitPoint;
|
||||
// must get the latest solrCore object because the one we have might be closed because of a reload
|
||||
// todo stop keeping solrCore around
|
||||
SolrCore core = solrCore.getCoreDescriptor().getCoreContainer().getCore(solrCore.getName());
|
||||
try {
|
||||
Future[] waitSearcher = new Future[1];
|
||||
searcher = solrCore.getSearcher(true, true, waitSearcher, true);
|
||||
searcher = core.getSearcher(true, true, waitSearcher, true);
|
||||
if (waitSearcher[0] != null) {
|
||||
try {
|
||||
waitSearcher[0].get();
|
||||
|
@ -767,10 +767,10 @@ public class IndexFetcher {
|
|||
}
|
||||
commitPoint = searcher.get().getIndexReader().getIndexCommit();
|
||||
} finally {
|
||||
req.close();
|
||||
if (searcher != null) {
|
||||
searcher.decref();
|
||||
}
|
||||
core.close();
|
||||
}
|
||||
|
||||
// update the commit point in replication handler
|
||||
|
|
|
@ -300,9 +300,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
rsp.add("message","No slave configured");
|
||||
}
|
||||
} else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
|
||||
IndexFetcher fetcher = currentIndexFetcher;
|
||||
if (fetcher != null){
|
||||
fetcher.abortFetch();
|
||||
if (abortFetch()){
|
||||
rsp.add(STATUS, OK_STATUS);
|
||||
} else {
|
||||
rsp.add(STATUS,ERR_STATUS);
|
||||
|
@ -321,6 +319,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
}
|
||||
|
||||
public boolean abortFetch() {
|
||||
IndexFetcher fetcher = currentIndexFetcher;
|
||||
if (fetcher != null){
|
||||
fetcher.abortFetch();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteSnapshot(ModifiableSolrParams params) {
|
||||
String name = params.get(NAME);
|
||||
if(name == null) {
|
||||
|
@ -658,7 +666,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
rsp.add(CMD_GET_FILE_LIST, result);
|
||||
|
||||
// fetch list of tlog files only if cdcr is activated
|
||||
if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
|
||||
if (solrParams.getBool(TLOG_FILES, true) && core.getUpdateHandler().getUpdateLog() != null
|
||||
&& core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
|
||||
try {
|
||||
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
|
||||
LOG.info("Adding tlog files to list: " + tlogfiles);
|
||||
|
|
|
@ -151,7 +151,12 @@ public class CdcrUpdateLog extends UpdateLog {
|
|||
if (id != -1) return id;
|
||||
if (tlogFiles.length == 0) return -1;
|
||||
String last = tlogFiles[tlogFiles.length - 1];
|
||||
return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
|
||||
if (TLOG_NAME.length() + 1 > last.lastIndexOf('.')) {
|
||||
// old tlog created by default UpdateLog impl
|
||||
return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
|
||||
} else {
|
||||
return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -396,4 +396,8 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
|||
this.lastReplicationSuccess = success;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock getRecoveryLock() {
|
||||
return recoveryLock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,4 +163,6 @@ public abstract class SolrCoreState {
|
|||
super(s);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract Lock getRecoveryLock();
|
||||
}
|
||||
|
|
|
@ -96,11 +96,11 @@ public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
|
|||
ModifiableSolrParams result = super.filterParams(params);
|
||||
if (params.get(CDCR_UPDATE) != null) {
|
||||
result.set(CDCR_UPDATE, "");
|
||||
if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
|
||||
log.warn("+++ cdcr.update but no version field, params are: " + params);
|
||||
} else {
|
||||
log.info("+++ cdcr.update version present, params are: " + params);
|
||||
}
|
||||
// if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
|
||||
// log.warn("+++ cdcr.update but no version field, params are: " + params);
|
||||
// } else {
|
||||
// log.info("+++ cdcr.update version present, params are: " + params);
|
||||
// }
|
||||
result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
|
||||
}
|
||||
|
||||
|
|
|
@ -295,7 +295,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
// this should always be used - see filterParams
|
||||
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
|
||||
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
|
||||
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, VERSION_FIELD);
|
||||
|
||||
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
|
||||
</types>
|
||||
<fields>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true"/>
|
||||
</fields>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -0,0 +1,60 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- This is a "kitchen sink" config file that tests can use.
|
||||
When writting a new test, feel free to add *new* items (plugins,
|
||||
config options, etc...) as long as they don't break any existing
|
||||
tests. if you need to test something esoteric please add a new
|
||||
"solrconfig-your-esoteric-purpose.xml" config file.
|
||||
|
||||
Note in particular that this test is used by MinimalSchemaTest so
|
||||
Anything added to this file needs to work correctly even if there
|
||||
is now uniqueKey or defaultSearch Field.
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
<schemaFactory class="ClassicIndexSchemaFactory"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<commitWithin>
|
||||
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
|
||||
</commitWithin>
|
||||
|
||||
<updateLog>
|
||||
<str name="dir">${solr.ulog.dir:}</str>
|
||||
</updateLog>
|
||||
|
||||
</updateHandler>
|
||||
<requestHandler name="/select" class="solr.SearchHandler">
|
||||
<lst name="defaults">
|
||||
<str name="echoParams">explicit</str>
|
||||
<str name="indent">true</str>
|
||||
<str name="df">text</str>
|
||||
</lst>
|
||||
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
|
||||
</types>
|
||||
<fields>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true"/>
|
||||
</fields>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -0,0 +1,76 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- This is a "kitchen sink" config file that tests can use.
|
||||
When writting a new test, feel free to add *new* items (plugins,
|
||||
config options, etc...) as long as they don't break any existing
|
||||
tests. if you need to test something esoteric please add a new
|
||||
"solrconfig-your-esoteric-purpose.xml" config file.
|
||||
|
||||
Note in particular that this test is used by MinimalSchemaTest so
|
||||
Anything added to this file needs to work correctly even if there
|
||||
is now uniqueKey or defaultSearch Field.
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateRequestProcessorChain name="cdcr-processor-chain">
|
||||
<processor class="solr.CdcrUpdateProcessorFactory"/>
|
||||
<processor class="solr.RunUpdateProcessorFactory"/>
|
||||
</updateRequestProcessorChain>
|
||||
|
||||
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
|
||||
<lst name="replica">
|
||||
<str name="zkHost">${cdcr.target.zkHost}</str>
|
||||
<str name="source">cdcr-source</str>
|
||||
<str name="target">cdcr-target</str>
|
||||
</lst>
|
||||
<lst name="replicator">
|
||||
<str name="threadPoolSize">1</str>
|
||||
<str name="schedule">1000</str>
|
||||
<str name="batchSize">1000</str>
|
||||
</lst>
|
||||
<lst name="updateLogSynchronizer">
|
||||
<str name="schedule">1000</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<updateLog class="solr.CdcrUpdateLog">
|
||||
<str name="dir">${solr.ulog.dir:}</str>
|
||||
</updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="standard" class="solr.StandardRequestHandler">
|
||||
</requestHandler>
|
||||
|
||||
<requestHandler name="/update" class="solr.UpdateRequestHandler">
|
||||
<lst name="defaults">
|
||||
<str name="update.chain">cdcr-processor-chain</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
|
||||
</types>
|
||||
<fields>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true"/>
|
||||
</fields>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -0,0 +1,63 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- This is a "kitchen sink" config file that tests can use.
|
||||
When writting a new test, feel free to add *new* items (plugins,
|
||||
config options, etc...) as long as they don't break any existing
|
||||
tests. if you need to test something esoteric please add a new
|
||||
"solrconfig-your-esoteric-purpose.xml" config file.
|
||||
|
||||
Note in particular that this test is used by MinimalSchemaTest so
|
||||
Anything added to this file needs to work correctly even if there
|
||||
is now uniqueKey or defaultSearch Field.
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateRequestProcessorChain name="cdcr-processor-chain">
|
||||
<processor class="solr.CdcrUpdateProcessorFactory"/>
|
||||
<processor class="solr.RunUpdateProcessorFactory"/>
|
||||
</updateRequestProcessorChain>
|
||||
|
||||
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
|
||||
</requestHandler>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<updateLog class="solr.CdcrUpdateLog">
|
||||
<str name="dir">${solr.ulog.dir:}</str>
|
||||
</updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="standard" class="solr.StandardRequestHandler">
|
||||
</requestHandler>
|
||||
|
||||
<requestHandler name="/update" class="solr.UpdateRequestHandler">
|
||||
<lst name="defaults">
|
||||
<str name="update.chain">cdcr-processor-chain</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -28,6 +28,7 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -69,6 +71,8 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
|||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -763,6 +767,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
|
||||
NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
|
||||
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
|
||||
if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED)) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
|
||||
int cnt = 15;
|
||||
while (cnt > 0) {
|
||||
|
|
|
@ -0,0 +1,396 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.CdcrParams;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
/**
|
||||
* Starts a source cluster with no CDCR configuration, indexes enough documents such that
|
||||
* the at least one old tlog is closed and thrown away so that the source cluster does not have
|
||||
* all updates available in tlogs only.
|
||||
* <p>
|
||||
* Then we start a target cluster with CDCR configuration and we change the source cluster configuration
|
||||
* to use CDCR (i.e. CdcrUpdateLog, CdcrRequestHandler and CdcrUpdateProcessor) and restart it.
|
||||
* <p>
|
||||
* We test that all updates eventually make it to the target cluster and that the collectioncheckpoint
|
||||
* call returns the same version as the last update indexed on the source.
|
||||
*/
|
||||
@Test
|
||||
public void testConvertClusterToCdcrAndBootstrap() throws Exception {
|
||||
// start the target first so that we know its zkhost
|
||||
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
target.waitForAllNodes(30);
|
||||
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
|
||||
|
||||
// start a cluster with no cdcr
|
||||
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
source.waitForAllNodes(30);
|
||||
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source-disabled").toFile();
|
||||
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
|
||||
source.uploadConfigDir(configDir, "cdcr-source");
|
||||
|
||||
// create a collection with the cdcr-source-disabled configset
|
||||
Map<String, String> collectionProperties = new HashMap<>();
|
||||
// todo investigate why this is necessary??? because by default it selects a ram directory which deletes the tlogs on reloads?
|
||||
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
|
||||
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
// index 10000 docs with a hard commit every 1000 documents
|
||||
CloudSolrClient sourceSolrClient = source.getSolrClient();
|
||||
sourceSolrClient.setDefaultCollection("cdcr-source");
|
||||
int numDocs = 0;
|
||||
for (int k = 0; k < 100; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (; numDocs < (k + 1) * 100; numDocs++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "source_" + numDocs);
|
||||
doc.addField("xyz", numDocs);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
|
||||
req.process(sourceSolrClient);
|
||||
}
|
||||
|
||||
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
// lets find and keep the maximum version assigned by source cluster across all our updates
|
||||
long maxVersion = Long.MIN_VALUE;
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/get");
|
||||
params.set("getVersions", numDocs);
|
||||
response = sourceSolrClient.query(params);
|
||||
List<Long> versions = (List<Long>) response.getResponse().get("versions");
|
||||
for (Long version : versions) {
|
||||
maxVersion = Math.max(maxVersion, version);
|
||||
}
|
||||
|
||||
// upload the cdcr-enabled config and restart source cluster
|
||||
final File cdcrEnabledSourceConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
|
||||
source.uploadConfigDir(cdcrEnabledSourceConfigDir, "cdcr-source");
|
||||
JettySolrRunner runner = source.stopJettySolrRunner(0);
|
||||
source.startJettySolrRunner(runner);
|
||||
assertTrue(runner.isRunning());
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("Document mismatch on source after restart", numDocs, response.getResults().getNumFound());
|
||||
|
||||
// setup the target cluster
|
||||
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
|
||||
target.uploadConfigDir(targetConfigDir, "cdcr-target");
|
||||
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
|
||||
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
CloudSolrClient targetSolrClient = target.getSolrClient();
|
||||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
Thread.sleep(1000);
|
||||
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
System.out.println("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
response = targetSolrClient.query(params);
|
||||
Long checkpoint = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
|
||||
assertNotNull(checkpoint);
|
||||
assertEquals("COLLECTIONCHECKPOINT from target cluster should have returned the maximum " +
|
||||
"version across all updates made to source", maxVersion, checkpoint.longValue());
|
||||
} finally {
|
||||
source.shutdown();
|
||||
}
|
||||
} finally {
|
||||
target.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test start cdcr source, adds data,starts target cluster, verifies replication,
|
||||
* stops cdcr replication and buffering, adds more data, re-enables cdcr and verify replication
|
||||
*/
|
||||
public void testBootstrapWithSourceCluster() throws Exception {
|
||||
// start the target first so that we know its zkhost
|
||||
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
target.waitForAllNodes(30);
|
||||
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
|
||||
|
||||
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
source.waitForAllNodes(30);
|
||||
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
|
||||
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
|
||||
source.uploadConfigDir(configDir, "cdcr-source");
|
||||
|
||||
Map<String, String> collectionProperties = new HashMap<>();
|
||||
// todo investigate why this is necessary???
|
||||
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
|
||||
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
// index 10000 docs with a hard commit every 1000 documents
|
||||
CloudSolrClient sourceSolrClient = source.getSolrClient();
|
||||
sourceSolrClient.setDefaultCollection("cdcr-source");
|
||||
int numDocs = 0;
|
||||
for (int k = 0; k < 100; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (; numDocs < (k + 1) * 100; numDocs++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "source_" + numDocs);
|
||||
doc.addField("xyz", numDocs);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
|
||||
req.process(sourceSolrClient);
|
||||
}
|
||||
|
||||
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
// setup the target cluster
|
||||
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
|
||||
target.uploadConfigDir(targetConfigDir, "cdcr-target");
|
||||
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
|
||||
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
CloudSolrClient targetSolrClient = target.getSolrClient();
|
||||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
System.out.println("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
cdcrStop(sourceSolrClient);
|
||||
cdcrDisableBuffer(sourceSolrClient);
|
||||
|
||||
int c = 0;
|
||||
for (int k = 0; k < 100; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (; c < (k + 1) * 100; c++, numDocs++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "source_" + numDocs);
|
||||
doc.addField("xyz", numDocs);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
|
||||
req.process(sourceSolrClient);
|
||||
}
|
||||
|
||||
response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
cdcrStart(sourceSolrClient);
|
||||
cdcrEnableBuffer(sourceSolrClient);
|
||||
|
||||
foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
} finally {
|
||||
source.shutdown();
|
||||
}
|
||||
} finally {
|
||||
target.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testBootstrapWithContinousIndexingOnSourceCluster() throws Exception {
|
||||
// start the target first so that we know its zkhost
|
||||
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
|
||||
target.waitForAllNodes(30);
|
||||
try {
|
||||
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
|
||||
|
||||
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
source.waitForAllNodes(30);
|
||||
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
|
||||
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
|
||||
source.uploadConfigDir(configDir, "cdcr-source");
|
||||
|
||||
Map<String, String> collectionProperties = new HashMap<>();
|
||||
// todo investigate why this is necessary???
|
||||
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
|
||||
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
// index 10000 docs with a hard commit every 1000 documents
|
||||
CloudSolrClient sourceSolrClient = source.getSolrClient();
|
||||
sourceSolrClient.setDefaultCollection("cdcr-source");
|
||||
int numDocs = 0;
|
||||
for (int k = 0; k < 100; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (; numDocs < (k + 1) * 100; numDocs++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "source_" + numDocs);
|
||||
doc.addField("xyz", numDocs);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
|
||||
req.process(sourceSolrClient);
|
||||
}
|
||||
|
||||
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
// setup the target cluster
|
||||
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
|
||||
target.uploadConfigDir(targetConfigDir, "cdcr-target");
|
||||
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
|
||||
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
CloudSolrClient targetSolrClient = target.getSolrClient();
|
||||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
Thread.sleep(1000);
|
||||
|
||||
cdcrStart(targetSolrClient);
|
||||
cdcrStart(sourceSolrClient);
|
||||
|
||||
int c = 0;
|
||||
for (int k = 0; k < 100; k++) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (; c < (k + 1) * 100; c++, numDocs++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", "source_" + numDocs);
|
||||
doc.addField("xyz", numDocs);
|
||||
req.add(doc);
|
||||
}
|
||||
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
|
||||
req.process(sourceSolrClient);
|
||||
}
|
||||
|
||||
response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
response = getCdcrQueue(sourceSolrClient);
|
||||
System.out.println("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
|
||||
} finally {
|
||||
source.shutdown();
|
||||
}
|
||||
} finally {
|
||||
target.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
|
||||
long start = System.nanoTime();
|
||||
QueryResponse response = null;
|
||||
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
targetSolrClient.commit();
|
||||
response = targetSolrClient.query(new SolrQuery("*:*"));
|
||||
if (response.getResults().getNumFound() == numDocs) {
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return response != null ? response.getResults().getNumFound() : 0;
|
||||
}
|
||||
|
||||
|
||||
private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
|
||||
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
|
||||
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
|
||||
}
|
||||
|
||||
private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
|
||||
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
|
||||
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
|
||||
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
|
||||
}
|
||||
|
||||
private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, action.toLower());
|
||||
return client.query(params);
|
||||
}
|
||||
|
||||
private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/cdcr");
|
||||
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
|
||||
return client.query(params);
|
||||
}
|
||||
}
|
|
@ -103,6 +103,11 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
// check status
|
||||
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
|
||||
|
||||
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
|
||||
|
||||
// sleep for a bit to ensure that replicator threads are started
|
||||
Thread.sleep(3000);
|
||||
|
||||
// Kill all the servers of the target
|
||||
this.deleteCollection(TARGET_COLLECTION);
|
||||
|
||||
|
@ -156,6 +161,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
|
@ -182,6 +190,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
|
@ -203,6 +214,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
log.info("Indexing 10 documents");
|
||||
|
||||
int start = 0;
|
||||
|
@ -244,6 +258,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
log.info("Waiting for replication");
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
|
@ -267,6 +284,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
log.info("Indexing 10 documents");
|
||||
|
||||
int start = 0;
|
||||
|
@ -349,6 +369,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
|
||||
}
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
@ -495,6 +518,8 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
// Start CDCR
|
||||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
|
@ -526,6 +551,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
|
||||
assertNumDocs(128, SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
|
@ -553,6 +581,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
|
|||
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
|
||||
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
|
||||
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
||||
// wait a bit for the replication to complete
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
|
||||
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
|
||||
|
|
Loading…
Reference in New Issue