SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient

(cherry picked from commit 153c270045)
This commit is contained in:
Shalin Shekhar Mangar 2016-08-08 11:22:50 +05:30
parent 9c37aaabe4
commit cc3f3e8a8b
23 changed files with 1280 additions and 28 deletions

View File

@ -79,6 +79,9 @@ New Features
* SOLR-9252: Feature selection and logistic regression on text (Cao Manh Dat, Joel Bernstein)
* SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient.
(Noble Paul, Renaud Delbru, shalin)
Bug Fixes
----------------------

View File

@ -121,6 +121,11 @@ public class CdcrParams {
*/
public final static String COUNTER_DELETES = "deletes";
/**
* Counter for Bootstrap operations *
*/
public final static String COUNTER_BOOTSTRAP = "bootstraps";
/**
* A list of errors per target collection *
*/
@ -165,7 +170,10 @@ public class CdcrParams {
LASTPROCESSEDVERSION,
QUEUES,
OPS,
ERRORS;
ERRORS,
BOOTSTRAP,
BOOTSTRAP_STATUS,
CANCEL_BOOTSTRAP;
public static CdcrAction get(String p) {
if (p != null) {

View File

@ -119,7 +119,7 @@ public class CdcrReplicator implements Runnable {
// we might have read a single commit operation and reached the end of the update logs
logReader.forwardSeek(subReader);
log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
} catch (Exception e) {
// report error and update error stats
this.handleException(e);
@ -150,13 +150,13 @@ public class CdcrReplicator implements Runnable {
if (e instanceof CdcrReplicatorException) {
UpdateRequest req = ((CdcrReplicatorException) e).req;
UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
log.warn("Failed to forward update request {}. Got response {}", req, rsp);
log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
} else if (e instanceof CloudSolrClient.RouteException) {
log.warn("Failed to forward update request", e);
log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
} else {
log.warn("Failed to forward update request", e);
log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
}
}

View File

@ -16,29 +16,49 @@
*/
package org.apache.solr.handler;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
// 6 hours is hopefully long enough for most indexes
private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
private List<CdcrReplicatorState> replicatorStates;
private final CdcrReplicatorScheduler scheduler;
@ -48,6 +68,9 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
private SolrCore core;
private String path;
private ExecutorService bootstrapExecutor;
private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
CdcrReplicatorManager(final SolrCore core, String path,
@ -104,12 +127,20 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
@Override
public synchronized void stateUpdate() {
if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
if (replicatorStates.size() > 0) {
this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
}
this.initLogReaders();
this.scheduler.start();
return;
}
this.scheduler.shutdown();
if (bootstrapExecutor != null) {
IOUtils.closeQuietly(bootstrapStatusRunnable);
ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
}
this.closeLogReaders();
}
@ -117,7 +148,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
return replicatorStates;
}
void initLogReaders() {
private void initLogReaders() {
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
@ -129,8 +160,23 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
checkpoint, collectionName, shard);
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
reader.seek(checkpoint);
boolean seek = reader.seek(checkpoint);
state.init(reader);
if (!seek) {
// targetVersion is lower than the oldest known entry.
// In this scenario, it probably means that there is a gap in the updates log.
// the best we can do here is to bootstrap the target leader by replicating the full index
final String targetCollection = state.getTargetCollection();
state.setBootstrapInProgress(true);
log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
log.info("Submitting bootstrap task to executor");
try {
bootstrapExecutor.submit(bootstrapStatusRunnable);
} catch (Exception e) {
log.error("Unable to submit bootstrap call to executor", e);
}
}
} catch (IOException | SolrServerException | SolrException e) {
log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
} catch (InterruptedException e) {
@ -164,11 +210,203 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
*/
void shutdown() {
this.scheduler.shutdown();
if (bootstrapExecutor != null) {
IOUtils.closeQuietly(bootstrapStatusRunnable);
ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
}
for (CdcrReplicatorState state : replicatorStates) {
state.shutdown();
}
replicatorStates.clear();
}
private class BootstrapStatusRunnable implements Runnable, Closeable {
private final CdcrReplicatorState state;
private final String targetCollection;
private final String shard;
private final String collectionName;
private final CdcrUpdateLog ulog;
private final String myCoreUrl;
private volatile boolean closed = false;
BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
this.state = state;
this.targetCollection = state.getTargetCollection();
String baseUrl = core.getCoreDescriptor().getCoreContainer().getZkController().getBaseUrl();
this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
}
@Override
public void close() throws IOException {
closed = true;
try {
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
String leaderCoreUrl = leader.getCoreUrl();
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
} catch (SolrServerException e) {
log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
targetCollection, shard, leaderCoreUrl);
}
} catch (InterruptedException e) {
log.error("Interrupted while closing BootstrapStatusRunnable", e);
Thread.currentThread().interrupt();
}
}
@Override
public void run() {
int retries = 1;
boolean success = false;
try {
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {
if (closed) {
log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
state.setBootstrapInProgress(false);
break;
}
BootstrapStatus status = getBoostrapStatus();
if (status == BootstrapStatus.RUNNING) {
try {
log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else if (status == BootstrapStatus.COMPLETED) {
log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
checkpoint, collectionName, shard);
CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
reader1.seek(checkpoint);
success = true;
break;
} else if (status == BootstrapStatus.FAILED) {
log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
// let's retry a fixed number of times before giving up
if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
break;
} else {
log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); // reset the timer
retries++;
}
} else if (status == BootstrapStatus.NOTFOUND) {
// the leader of the target shard may have changed and therefore there is no record of the
// bootstrap process so we must retry the operation
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
retries = 1;
timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS); // reset the timer
} else if (status == BootstrapStatus.UNKNOWN) {
// we were not able to query the status on the remote end
// so just sleep for a bit and try again
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
}
} catch (InterruptedException e) {
log.info("Bootstrap thread interrupted");
state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
Thread.currentThread().interrupt();
} catch (IOException | SolrServerException | SolrException e) {
log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
} finally {
if (success) {
log.info("Bootstrap successful, giving the go-ahead to replicator");
state.setBootstrapInProgress(false);
}
}
}
private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
String leaderCoreUrl = leader.getCoreUrl();
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
try {
NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
log.debug("CDCR Bootstrap response: {}", response);
String status = response.get(RESPONSE_STATUS).toString();
return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
} catch (Exception e) {
log.error("Exception submitting bootstrap request", e);
return BootstrapStatus.UNKNOWN;
}
} catch (IOException e) {
log.error("There shouldn't be an IOException while closing but there was!", e);
}
return BootstrapStatus.UNKNOWN;
}
private BootstrapStatus getBoostrapStatus() throws InterruptedException {
try {
Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
String leaderCoreUrl = leader.getCoreUrl();
HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
String status = (String) response.get(RESPONSE_STATUS);
BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
if (bootstrapStatus == BootstrapStatus.RUNNING) {
return BootstrapStatus.RUNNING;
} else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
return BootstrapStatus.COMPLETED;
} else if (bootstrapStatus == BootstrapStatus.FAILED) {
return BootstrapStatus.FAILED;
} else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
return BootstrapStatus.NOTFOUND;
} else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
return BootstrapStatus.CANCELLED;
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
}
}
} catch (Exception e) {
log.error("Exception during bootstrap status request", e);
return BootstrapStatus.UNKNOWN;
}
}
}
private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(CommonParams.QT, "/cdcr");
solrParams.set(CommonParams.ACTION, action.toString());
for (int i = 0; i < params.length - 1; i+=2) {
solrParams.set(params[i], params[i + 1]);
}
SolrRequest request = new QueryRequest(solrParams);
return client.request(request);
}
private enum BootstrapStatus {
SUBMITTED,
RUNNING,
COMPLETED,
FAILED,
NOTFOUND,
CANCELLED,
UNKNOWN
}
}

View File

@ -77,7 +77,11 @@ class CdcrReplicatorScheduler {
CdcrReplicatorState state = statesQueue.poll();
assert state != null; // Should never happen
try {
new CdcrReplicator(state, batchSize).run();
if (!state.isBootstrapInProgress()) {
new CdcrReplicator(state, batchSize).run();
} else {
log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
}
} finally {
statesQueue.offer(state);
}

View File

@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.update.CdcrUpdateLog;
@ -53,6 +55,9 @@ class CdcrReplicatorState {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
private final AtomicInteger numBootstraps = new AtomicInteger();
CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
this.targetCollection = targetCollection;
this.targetClient = targetClient;
@ -164,6 +169,24 @@ class CdcrReplicatorState {
return this.benchmarkTimer;
}
/**
* @return true if a bootstrap operation is in progress, false otherwise
*/
boolean isBootstrapInProgress() {
return bootstrapInProgress.get();
}
void setBootstrapInProgress(boolean inProgress) {
if (bootstrapInProgress.compareAndSet(true, false)) {
numBootstraps.incrementAndGet();
}
bootstrapInProgress.set(inProgress);
}
public int getNumBootstraps() {
return numBootstraps.get();
}
enum ErrorType {
INTERNAL,
BAD_REQUEST;

View File

@ -16,6 +16,7 @@
*/
package org.apache.solr.handler;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@ -24,14 +25,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -41,21 +48,33 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginBag;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
/**
* <p>
* This request handler implements the CDCR API and is responsible of the execution of the
@ -199,6 +218,18 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
this.handleErrorsAction(req, rsp);
break;
}
case BOOTSTRAP: {
this.handleBootstrapAction(req, rsp);
break;
}
case BOOTSTRAP_STATUS: {
this.handleBootstrapStatus(req, rsp);
break;
}
case CANCEL_BOOTSTRAP: {
this.handleCancelBootstrap(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
}
@ -409,10 +440,20 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
VersionInfo versionInfo = ulog.getVersionInfo();
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
List<Long> versions = recentUpdates.getVersions(1);
long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
rsp.add(CdcrParams.CHECKPOINT, lastVersion);
long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
// there is no race with ongoing bootstrap because we don't expect any updates to come from the source
long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
if (maxVersion == 0L) {
maxVersion = -1;
}
rsp.add(CdcrParams.CHECKPOINT, maxVersion);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
"' could not read max version");
}
}
@ -574,6 +615,192 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CdcrParams.ERRORS, hosts);
}
private AtomicBoolean running = new AtomicBoolean();
private volatile Future<Boolean> bootstrapFuture;
private volatile BootstrapCallable bootstrapCallable;
private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
if (!leaderStateManager.amILeader()) {
log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
" sent to non-leader replica");
}
Runnable runnable = () -> {
Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
boolean locked = recoveryLock.tryLock();
try {
if (!locked) {
handleCancelBootstrap(req, rsp);
} else if (leaderStateManager.amILeader()) {
running.set(true);
String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
bootstrapCallable = new BootstrapCallable(masterUrl, core);
bootstrapFuture = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getRecoveryExecutor().submit(bootstrapCallable);
try {
bootstrapFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Bootstrap was interrupted", e);
} catch (ExecutionException e) {
log.error("Bootstrap operation failed", e);
}
} else {
log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
}
} finally {
if (locked) {
running.set(false);
recoveryLock.unlock();
}
}
};
try {
core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
rsp.add(RESPONSE_STATUS, "submitted");
} catch (RejectedExecutionException ree) {
// no problem, we're probably shutting down
rsp.add(RESPONSE_STATUS, "failed");
}
}
private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
BootstrapCallable callable = this.bootstrapCallable;
IOUtils.closeQuietly(callable);
rsp.add(RESPONSE_STATUS, "cancelled");
}
private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
if (running.get()) {
rsp.add(RESPONSE_STATUS, RUNNING);
return;
}
Future<Boolean> future = bootstrapFuture;
BootstrapCallable callable = this.bootstrapCallable;
if (future == null) {
rsp.add(RESPONSE_STATUS, "notfound");
rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
} else if (future.isCancelled() || callable.isClosed()) {
rsp.add(RESPONSE_STATUS, "cancelled");
} else if (future.isDone()) {
// could be a normal termination or an exception
try {
Boolean result = future.get();
if (result) {
rsp.add(RESPONSE_STATUS, COMPLETED);
} else {
rsp.add(RESPONSE_STATUS, FAILED);
}
} catch (InterruptedException e) {
// should not happen?
} catch (ExecutionException e) {
rsp.add(RESPONSE_STATUS, FAILED);
rsp.add(RESPONSE, e);
} catch (CancellationException ce) {
rsp.add(RESPONSE_STATUS, FAILED);
rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
}
} else {
rsp.add(RESPONSE_STATUS, RUNNING);
}
}
private static class BootstrapCallable implements Callable<Boolean>, Closeable {
private final String masterUrl;
private final SolrCore core;
private volatile boolean closed = false;
BootstrapCallable(String masterUrl, SolrCore core) {
this.masterUrl = masterUrl;
this.core = core;
}
@Override
public void close() throws IOException {
closed = true;
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
replicationHandler.abortFetch();
}
public boolean isClosed() {
return closed;
}
@Override
public Boolean call() throws Exception {
boolean success = false;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
// we start buffering updates as a safeguard however we do not expect
// to receive any updates from the source during bootstrap
ulog.bufferUpdates();
try {
commitOnLeader(masterUrl);
// use rep handler directly, so we can do this sync rather than async
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
if (replicationHandler == null) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
// we do not want the raw tlog files from the source
solrParams.set(ReplicationHandler.TLOG_FILES, false);
success = replicationHandler.doFetch(solrParams, false);
// this is required because this callable can race with HttpSolrCall#destroy
// which clears the request info.
// Applying buffered updates fails without the following line because LogReplayer
// also tries to set request info and fails with AssertionError
SolrRequestInfo.clearRequestInfo();
Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
if (future == null) {
// no replay needed
log.info("No replay needed.");
} else {
log.info("Replaying buffered documents.");
// wait for replay
UpdateLog.RecoveryInfo report = future.get();
if (report.failed) {
SolrException.log(log, "Replay failed");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
}
}
return success;
} finally {
if (closed || !success) {
// we cannot apply the buffer in this case because it will introduce newer versions in the
// update log and then the source cluster will get those versions via collectioncheckpoint
// causing the versions in between to be completely missed
boolean dropped = ulog.dropBufferedUpdates();
assert dropped;
}
}
}
private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
client.setConnectionTimeout(30000);
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
}
}
}
@Override
public String getDescription() {
return "Manage Cross Data Center Replication";

View File

@ -736,14 +736,14 @@ public class IndexFetcher {
}
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
// must get the latest solrCore object because the one we have might be closed because of a reload
// todo stop keeping solrCore around
SolrCore core = solrCore.getCoreDescriptor().getCoreContainer().getCore(solrCore.getName());
try {
Future[] waitSearcher = new Future[1];
searcher = solrCore.getSearcher(true, true, waitSearcher, true);
searcher = core.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
@ -753,10 +753,10 @@ public class IndexFetcher {
}
commitPoint = searcher.get().getIndexReader().getIndexCommit();
} finally {
req.close();
if (searcher != null) {
searcher.decref();
}
core.close();
}
// update the commit point in replication handler

View File

@ -300,9 +300,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add("message","No slave configured");
}
} else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
IndexFetcher fetcher = currentIndexFetcher;
if (fetcher != null){
fetcher.abortFetch();
if (abortFetch()){
rsp.add(STATUS, OK_STATUS);
} else {
rsp.add(STATUS,ERR_STATUS);
@ -321,6 +319,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
}
public boolean abortFetch() {
IndexFetcher fetcher = currentIndexFetcher;
if (fetcher != null){
fetcher.abortFetch();
return true;
} else {
return false;
}
}
private void deleteSnapshot(ModifiableSolrParams params) {
String name = params.get(NAME);
if(name == null) {
@ -651,7 +659,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CMD_GET_FILE_LIST, result);
// fetch list of tlog files only if cdcr is activated
if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
if (solrParams.getBool(TLOG_FILES, true) && core.getUpdateHandler().getUpdateLog() != null
&& core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
try {
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
LOG.info("Adding tlog files to list: " + tlogfiles);

View File

@ -151,7 +151,12 @@ public class CdcrUpdateLog extends UpdateLog {
if (id != -1) return id;
if (tlogFiles.length == 0) return -1;
String last = tlogFiles[tlogFiles.length - 1];
return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
if (TLOG_NAME.length() + 1 > last.lastIndexOf('.')) {
// old tlog created by default UpdateLog impl
return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
} else {
return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
}
}
@Override

View File

@ -395,5 +395,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
public void setLastReplicateIndexSuccess(boolean success) {
this.lastReplicationSuccess = success;
}
@Override
public Lock getRecoveryLock() {
return recoveryLock;
}
}

View File

@ -163,4 +163,6 @@ public abstract class SolrCoreState {
super(s);
}
}
public abstract Lock getRecoveryLock();
}

View File

@ -96,11 +96,11 @@ public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
ModifiableSolrParams result = super.filterParams(params);
if (params.get(CDCR_UPDATE) != null) {
result.set(CDCR_UPDATE, "");
if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
log.warn("+++ cdcr.update but no version field, params are: " + params);
} else {
log.info("+++ cdcr.update version present, params are: " + params);
}
// if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
// log.warn("+++ cdcr.update but no version field, params are: " + params);
// } else {
// log.info("+++ cdcr.update version present, params are: " + params);
// }
result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
}

View File

@ -295,7 +295,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this should always be used - see filterParams
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, VERSION_FIELD);
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
</fields>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,60 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This is a "kitchen sink" config file that tests can use.
When writting a new test, feel free to add *new* items (plugins,
config options, etc...) as long as they don't break any existing
tests. if you need to test something esoteric please add a new
"solrconfig-your-esoteric-purpose.xml" config file.
Note in particular that this test is used by MinimalSchemaTest so
Anything added to this file needs to work correctly even if there
is now uniqueKey or defaultSearch Field.
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateHandler class="solr.DirectUpdateHandler2">
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
<updateLog>
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>
</requestHandler>
</config>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
</fields>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,76 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This is a "kitchen sink" config file that tests can use.
When writting a new test, feel free to add *new* items (plugins,
config options, etc...) as long as they don't break any existing
tests. if you need to test something esoteric please add a new
"solrconfig-your-esoteric-purpose.xml" config file.
Note in particular that this test is used by MinimalSchemaTest so
Anything added to this file needs to work correctly even if there
is now uniqueKey or defaultSearch Field.
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateRequestProcessorChain name="cdcr-processor-chain">
<processor class="solr.CdcrUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory"/>
</updateRequestProcessorChain>
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
<lst name="replica">
<str name="zkHost">${cdcr.target.zkHost}</str>
<str name="source">cdcr-source</str>
<str name="target">cdcr-target</str>
</lst>
<lst name="replicator">
<str name="threadPoolSize">1</str>
<str name="schedule">1000</str>
<str name="batchSize">1000</str>
</lst>
<lst name="updateLogSynchronizer">
<str name="schedule">1000</str>
</lst>
</requestHandler>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog class="solr.CdcrUpdateLog">
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<requestHandler name="/update" class="solr.UpdateRequestHandler">
<lst name="defaults">
<str name="update.chain">cdcr-processor-chain</str>
</lst>
</requestHandler>
</config>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
</types>
<fields>
<field name="id" type="string" indexed="true" stored="true"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
</fields>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,63 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This is a "kitchen sink" config file that tests can use.
When writting a new test, feel free to add *new* items (plugins,
config options, etc...) as long as they don't break any existing
tests. if you need to test something esoteric please add a new
"solrconfig-your-esoteric-purpose.xml" config file.
Note in particular that this test is used by MinimalSchemaTest so
Anything added to this file needs to work correctly even if there
is now uniqueKey or defaultSearch Field.
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateRequestProcessorChain name="cdcr-processor-chain">
<processor class="solr.CdcrUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory"/>
</updateRequestProcessorChain>
<requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
</requestHandler>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog class="solr.CdcrUpdateLog">
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<requestHandler name="/update" class="solr.UpdateRequestHandler">
<lst name="defaults">
<str name="update.chain">cdcr-processor-chain</str>
</lst>
</requestHandler>
</config>

View File

@ -28,6 +28,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.solr.client.solrj.SolrClient;
@ -57,6 +58,7 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.CdcrParams;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.AfterClass;
@ -70,6 +72,8 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
/**
* <p>
@ -763,6 +767,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
}
}
protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {
rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED)) {
break;
}
Thread.sleep(1000);
}
}
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
int cnt = 15;
while (cnt > 0) {

View File

@ -0,0 +1,396 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcrBootstrapTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Starts a source cluster with no CDCR configuration, indexes enough documents such that
* the at least one old tlog is closed and thrown away so that the source cluster does not have
* all updates available in tlogs only.
* <p>
* Then we start a target cluster with CDCR configuration and we change the source cluster configuration
* to use CDCR (i.e. CdcrUpdateLog, CdcrRequestHandler and CdcrUpdateProcessor) and restart it.
* <p>
* We test that all updates eventually make it to the target cluster and that the collectioncheckpoint
* call returns the same version as the last update indexed on the source.
*/
@Test
public void testConvertClusterToCdcrAndBootstrap() throws Exception {
// start the target first so that we know its zkhost
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
try {
target.waitForAllNodes(30);
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
// start a cluster with no cdcr
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
try {
source.waitForAllNodes(30);
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source-disabled").toFile();
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
source.uploadConfigDir(configDir, "cdcr-source");
// create a collection with the cdcr-source-disabled configset
Map<String, String> collectionProperties = new HashMap<>();
// todo investigate why this is necessary??? because by default it selects a ram directory which deletes the tlogs on reloads?
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
// index 10000 docs with a hard commit every 1000 documents
CloudSolrClient sourceSolrClient = source.getSolrClient();
sourceSolrClient.setDefaultCollection("cdcr-source");
int numDocs = 0;
for (int k = 0; k < 100; k++) {
UpdateRequest req = new UpdateRequest();
for (; numDocs < (k + 1) * 100; numDocs++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "source_" + numDocs);
doc.addField("xyz", numDocs);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
req.process(sourceSolrClient);
}
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound());
// lets find and keep the maximum version assigned by source cluster across all our updates
long maxVersion = Long.MIN_VALUE;
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/get");
params.set("getVersions", numDocs);
response = sourceSolrClient.query(params);
List<Long> versions = (List<Long>) response.getResponse().get("versions");
for (Long version : versions) {
maxVersion = Math.max(maxVersion, version);
}
// upload the cdcr-enabled config and restart source cluster
final File cdcrEnabledSourceConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
source.uploadConfigDir(cdcrEnabledSourceConfigDir, "cdcr-source");
JettySolrRunner runner = source.stopJettySolrRunner(0);
source.startJettySolrRunner(runner);
assertTrue(runner.isRunning());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("Document mismatch on source after restart", numDocs, response.getResults().getNumFound());
// setup the target cluster
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
target.uploadConfigDir(targetConfigDir, "cdcr-target");
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
CloudSolrClient targetSolrClient = target.getSolrClient();
targetSolrClient.setDefaultCollection("cdcr-target");
Thread.sleep(1000);
cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient);
response = getCdcrQueue(sourceSolrClient);
System.out.println("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
params.set(CommonParams.QT, "/cdcr");
response = targetSolrClient.query(params);
Long checkpoint = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
assertNotNull(checkpoint);
assertEquals("COLLECTIONCHECKPOINT from target cluster should have returned the maximum " +
"version across all updates made to source", maxVersion, checkpoint.longValue());
} finally {
source.shutdown();
}
} finally {
target.shutdown();
}
}
/**
* This test start cdcr source, adds data,starts target cluster, verifies replication,
* stops cdcr replication and buffering, adds more data, re-enables cdcr and verify replication
*/
public void testBootstrapWithSourceCluster() throws Exception {
// start the target first so that we know its zkhost
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
try {
target.waitForAllNodes(30);
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
try {
source.waitForAllNodes(30);
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
source.uploadConfigDir(configDir, "cdcr-source");
Map<String, String> collectionProperties = new HashMap<>();
// todo investigate why this is necessary???
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
// index 10000 docs with a hard commit every 1000 documents
CloudSolrClient sourceSolrClient = source.getSolrClient();
sourceSolrClient.setDefaultCollection("cdcr-source");
int numDocs = 0;
for (int k = 0; k < 100; k++) {
UpdateRequest req = new UpdateRequest();
for (; numDocs < (k + 1) * 100; numDocs++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "source_" + numDocs);
doc.addField("xyz", numDocs);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
req.process(sourceSolrClient);
}
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound());
// setup the target cluster
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
target.uploadConfigDir(targetConfigDir, "cdcr-target");
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
CloudSolrClient targetSolrClient = target.getSolrClient();
targetSolrClient.setDefaultCollection("cdcr-target");
cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient);
response = getCdcrQueue(sourceSolrClient);
System.out.println("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
cdcrStop(sourceSolrClient);
cdcrDisableBuffer(sourceSolrClient);
int c = 0;
for (int k = 0; k < 100; k++) {
UpdateRequest req = new UpdateRequest();
for (; c < (k + 1) * 100; c++, numDocs++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "source_" + numDocs);
doc.addField("xyz", numDocs);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
req.process(sourceSolrClient);
}
response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound());
cdcrStart(sourceSolrClient);
cdcrEnableBuffer(sourceSolrClient);
foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
} finally {
source.shutdown();
}
} finally {
target.shutdown();
}
}
public void testBootstrapWithContinousIndexingOnSourceCluster() throws Exception {
// start the target first so that we know its zkhost
MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
target.waitForAllNodes(30);
try {
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
try {
source.waitForAllNodes(30);
final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
source.uploadConfigDir(configDir, "cdcr-source");
Map<String, String> collectionProperties = new HashMap<>();
// todo investigate why this is necessary???
collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
// index 10000 docs with a hard commit every 1000 documents
CloudSolrClient sourceSolrClient = source.getSolrClient();
sourceSolrClient.setDefaultCollection("cdcr-source");
int numDocs = 0;
for (int k = 0; k < 100; k++) {
UpdateRequest req = new UpdateRequest();
for (; numDocs < (k + 1) * 100; numDocs++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "source_" + numDocs);
doc.addField("xyz", numDocs);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
req.process(sourceSolrClient);
}
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound());
// setup the target cluster
final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
target.uploadConfigDir(targetConfigDir, "cdcr-target");
target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
CloudSolrClient targetSolrClient = target.getSolrClient();
targetSolrClient.setDefaultCollection("cdcr-target");
Thread.sleep(1000);
cdcrStart(targetSolrClient);
cdcrStart(sourceSolrClient);
int c = 0;
for (int k = 0; k < 100; k++) {
UpdateRequest req = new UpdateRequest();
for (; c < (k + 1) * 100; c++, numDocs++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "source_" + numDocs);
doc.addField("xyz", numDocs);
req.add(doc);
}
req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
req.process(sourceSolrClient);
}
response = sourceSolrClient.query(new SolrQuery("*:*"));
assertEquals("", numDocs, response.getResults().getNumFound());
response = getCdcrQueue(sourceSolrClient);
System.out.println("Cdcr queue response: " + response.getResponse());
long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
} finally {
source.shutdown();
}
} finally {
target.shutdown();
}
}
private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
long start = System.nanoTime();
QueryResponse response = null;
while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
try {
targetSolrClient.commit();
response = targetSolrClient.query(new SolrQuery("*:*"));
if (response.getResults().getNumFound() == numDocs) {
break;
}
} catch (Exception e) {
log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
}
Thread.sleep(1000);
}
return response != null ? response.getResults().getNumFound() : 0;
}
private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
}
private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
}
private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
}
private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, action.toLower());
return client.query(params);
}
private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/cdcr");
params.set(CommonParams.ACTION, CdcrParams.QUEUES);
return client.query(params);
}
}

View File

@ -103,6 +103,11 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
// sleep for a bit to ensure that replicator threads are started
Thread.sleep(3000);
// Kill all the servers of the target
this.deleteCollection(TARGET_COLLECTION);
@ -156,6 +161,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@ -182,6 +190,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@ -203,6 +214,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
log.info("Indexing 10 documents");
int start = 0;
@ -244,6 +258,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
log.info("Waiting for replication");
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@ -267,6 +284,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
log.info("Indexing 10 documents");
int start = 0;
@ -349,6 +369,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
}
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@ -495,6 +518,8 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@ -526,6 +551,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
assertNumDocs(128, SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@ -553,6 +581,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);