SOLR-7789: Introduce a ConfigSet management API

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1698043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gregory Chanan 2015-08-27 02:18:36 +00:00
parent 4ef86ea9cc
commit 679dd8f790
41 changed files with 2056 additions and 105 deletions

View File

@ -129,6 +129,8 @@ New Features
* SOLR-7961: Print Solr's version with command bin/solr version (janhoy)
* SOLR-7889: Introduce a ConfigSet management API (Gregory Chanan)
Bug Fixes
----------------------

View File

@ -280,7 +280,7 @@ public class LeaderElector {
try {
if(joinAtHead){
log.info("Node {} trying to join election at the head", id);
List<String> nodes = OverseerProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);

View File

@ -309,7 +309,7 @@ public class Overseer implements Closeable {
try {
Map m = (Map) Utils.fromJSON(data);
String id = (String) m.get("id");
if(overseerCollectionProcessor.getId().equals(id)){
if(overseerCollectionConfigSetProcessor.getId().equals(id)){
try {
log.info("I'm exiting , but I'm still the leader");
zkClient.delete(path,stat.getVersion(),true);
@ -390,7 +390,7 @@ public class Overseer implements Closeable {
case QUIT:
if (myId.equals(message.get("id"))) {
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
overseerCollectionProcessor.close();
overseerCollectionConfigSetProcessor.close();
close();
} else {
log.warn("Overseer received wrong QUIT message {}", message);
@ -786,7 +786,7 @@ public class Overseer implements Closeable {
private final String adminPath;
private OverseerCollectionProcessor overseerCollectionProcessor;
private OverseerCollectionConfigSetProcessor overseerCollectionConfigSetProcessor;
private ZkController zkController;
@ -824,8 +824,8 @@ public class Overseer implements Closeable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
@ -922,15 +922,27 @@ public class Overseer implements Closeable {
}
/* Collection creation queue */
static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient) {
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
return new OverseerCollectionQueue(zkClient, "/overseer/collection-queue-work", zkStats);
return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
/* The queue for ConfigSet related operations */
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
}
static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure
// that the actions are prefixed with a unique string.
createOverseerNode(zkClient);
return getCollectionQueue(zkClient, zkStats);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);

View File

@ -21,14 +21,16 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
/**
* An {@link OverseerProcessor} that handles collection-related Overseer
* messages only.
* An {@link OverseerTaskProcessor} that handles:
* 1) collection-related Overseer messages
* 2) configset-related Overseer messages
*/
public class OverseerCollectionProcessor extends OverseerProcessor {
public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor {
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
public OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
final ShardHandler shardHandler,
String adminPath, Overseer.Stats stats, Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer) {
@ -47,13 +49,13 @@ public class OverseerCollectionProcessor extends OverseerProcessor {
);
}
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
protected OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
final ShardHandlerFactory shardHandlerFactory,
String adminPath,
Overseer.Stats stats,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer,
OverseerCollectionQueue workQueue,
OverseerTaskQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
@ -80,12 +82,18 @@ public class OverseerCollectionProcessor extends OverseerProcessor {
Overseer.Stats stats,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer) {
final OverseerCollectionMessageHandler messageHandler = new OverseerCollectionMessageHandler(
final OverseerCollectionMessageHandler collMessageHandler = new OverseerCollectionMessageHandler(
zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
zkStateReader);
return new OverseerMessageHandlerSelector() {
@Override
public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) {
return messageHandler;
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
return configMessageHandler;
}
return collMessageHandler;
}
};
}

View File

@ -121,7 +121,10 @@ import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.StrUtils.formatString;
import static org.apache.solr.common.util.Utils.makeMap;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
* overseer messages.
*/
public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
public static final String NUM_SLICES = "numShards";
@ -203,7 +206,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
@Override
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
log.warn("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
NamedList results = new NamedList();
try {
@ -371,7 +374,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
@SuppressWarnings("unchecked")
private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
String leaderNode = OverseerProcessor.getLeaderNode(zkStateReader.getZkClient());
String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
results.add("leader", leaderNode);
Stat stat = new Stat();
zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
@ -2470,7 +2473,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
@Override
public String getName() {
return "Overseer Collection Processor";
return "Overseer Collection Message Handler";
}
@Override
@ -2495,7 +2498,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
@Override
public void unmarkExclusiveTask(String collectionName, String operation) {
public void unmarkExclusiveTask(String collectionName, String operation, ZkNodeProps message) {
if(!CLUSTERSTATUS.isEqual(operation) && collectionName != null) {
synchronized (collectionWip) {
collectionWip.remove(collectionName);
@ -2510,8 +2513,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
if(CLUSTERSTATUS.isEqual(message.getStr(Overseer.QUEUE_OPERATION)))
return ExclusiveMarking.EXCLUSIVE;
if(collectionWip.contains(collectionName))
return ExclusiveMarking.NONEXCLUSIVE;
synchronized (collectionWip) {
if(collectionWip.contains(collectionName))
return ExclusiveMarking.NONEXCLUSIVE;
}
return ExclusiveMarking.NOTDETERMINED;
}

View File

@ -0,0 +1,365 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ConfigSetParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.ConfigSetProperties;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.noggit.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NONEXCLUSIVE;
import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NOTDETERMINED;
import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE;
import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.DELETE;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
* A {@link OverseerMessageHandler} that handles ConfigSets API related
* overseer messages.
*/
public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
/**
* Prefix to specify an action should be handled by this handler.
*/
public static final String CONFIGSETS_ACTION_PREFIX = "configsets:";
/**
* Name of the ConfigSet to copy from for CREATE
*/
public static final String BASE_CONFIGSET = "baseConfigSet";
/**
* Prefix for properties that should be applied to the ConfigSet for CREATE
*/
public static final String PROPERTY_PREFIX = "configSetProp";
private ZkStateReader zkStateReader;
// we essentially implement a read/write lock for the ConfigSet exclusivity as follows:
// WRITE: CREATE/DELETE on the ConfigSet under operation
// READ: for the Base ConfigSet being copied in CREATE.
// in this way, we prevent a Base ConfigSet from being deleted while it is being copied
// but don't prevent different ConfigSets from being created with the same Base ConfigSet
// at the same time.
final private Set configSetWriteWip;
final private Set configSetReadWip;
private static Logger log = LoggerFactory
.getLogger(OverseerConfigSetMessageHandler.class);
public OverseerConfigSetMessageHandler(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
this.configSetWriteWip = new HashSet();
this.configSetReadWip = new HashSet();
}
@Override
public SolrResponse processMessage(ZkNodeProps message, String operation) {
NamedList results = new NamedList();
try {
if (!operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Operation does not contain proper prefix: " + operation
+ " expected: " + CONFIGSETS_ACTION_PREFIX);
}
operation = operation.substring(CONFIGSETS_ACTION_PREFIX.length());
log.info("OverseerConfigSetMessageHandler.processMessage : "+ operation + " , "+ message.toString());
ConfigSetParams.ConfigSetAction action = ConfigSetParams.ConfigSetAction.get(operation);
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
}
switch (action) {
case CREATE:
createConfigSet(message);
break;
case DELETE:
deleteConfigSet(message);
break;
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
} catch (Exception e) {
String configSetName = message.getStr(NAME);
if (configSetName == null) {
SolrException.log(log, "Operation " + operation + " failed", e);
} else {
SolrException.log(log, "ConfigSet: " + configSetName + " operation: " + operation
+ " failed", e);
}
results.add("Operation " + operation + " caused exception:", e);
SimpleOrderedMap nl = new SimpleOrderedMap();
nl.add("msg", e.getMessage());
nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
results.add("exception", nl);
}
return new OverseerSolrResponse(results);
}
@Override
public String getName() {
return "Overseer ConfigSet Message Handler";
}
@Override
public String getTimerName(String operation) {
return "configset_" + operation;
}
@Override
public String getTaskKey(ZkNodeProps message) {
return message.getStr(NAME);
}
@Override
public void markExclusiveTask(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
markExclusive(configSetName, baseConfigSet);
}
private void markExclusive(String configSetName, String baseConfigSetName) {
synchronized (configSetWriteWip) {
configSetWriteWip.add(configSetName);
if (baseConfigSetName != null) configSetReadWip.add(baseConfigSetName);
}
}
@Override
public void unmarkExclusiveTask(String configSetName, String operation, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
unmarkExclusiveConfigSet(configSetName, baseConfigSet);
}
private void unmarkExclusiveConfigSet(String configSetName, String baseConfigSetName) {
synchronized (configSetWriteWip) {
configSetWriteWip.remove(configSetName);
if (baseConfigSetName != null) configSetReadWip.remove(baseConfigSetName);
}
}
@Override
public ExclusiveMarking checkExclusiveMarking(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
return checkExclusiveMarking(configSetName, baseConfigSet);
}
private ExclusiveMarking checkExclusiveMarking(String configSetName, String baseConfigSetName) {
synchronized (configSetWriteWip) {
// need to acquire:
// 1) write lock on ConfigSet
// 2) read lock on Base ConfigSet
if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName)) {
return NONEXCLUSIVE;
}
if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName)) {
return NONEXCLUSIVE;
}
}
return NOTDETERMINED;
}
private String getBaseConfigSetIfCreate(ZkNodeProps message) {
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation != null) {
operation = operation.substring(CONFIGSETS_ACTION_PREFIX.length());
ConfigSetParams.ConfigSetAction action = ConfigSetParams.ConfigSetAction.get(operation);
if (action == CREATE) {
return message.getStr(BASE_CONFIGSET);
}
}
return null;
}
private NamedList getConfigSetProperties(String path) throws IOException {
byte [] oldPropsData = null;
try {
oldPropsData = zkStateReader.getZkClient().getData(path, null, null, true);
} catch (KeeperException.NoNodeException e) {
log.info("no existing ConfigSet properties found");
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error reading old properties",
SolrZkClient.checkInterrupted(e));
}
if (oldPropsData != null) {
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(oldPropsData), StandardCharsets.UTF_8);
try {
return ConfigSetProperties.readFromInputStream(reader);
} finally {
reader.close();
}
}
return null;
}
private Map<String, Object> getNewProperties(ZkNodeProps message) {
Map<String, Object> properties = null;
for (Map.Entry<String, Object> entry : message.getProperties().entrySet()) {
if (entry.getKey().startsWith(PROPERTY_PREFIX + ".")) {
if (properties == null) {
properties = new HashMap<String, Object>();
}
properties.put(entry.getKey().substring((PROPERTY_PREFIX + ".").length()),
entry.getValue());
}
}
return properties;
}
private void mergeOldProperties(Map<String, Object> newProps, NamedList oldProps) {
Iterator<Map.Entry<String, Object>> it = oldProps.iterator();
while (it.hasNext()) {
Map.Entry<String, Object> oldEntry = it.next();
if (!newProps.containsKey(oldEntry.getKey())) {
newProps.put(oldEntry.getKey(), oldEntry.getValue());
}
}
}
private byte[] getPropertyData(Map<String, Object> newProps) {
if (newProps != null) {
String propertyDataStr = JSONUtil.toJSON(newProps);
if (propertyDataStr == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid property specification");
}
return propertyDataStr.getBytes(StandardCharsets.UTF_8);
}
return null;
}
private String getPropertyPath(String configName, String propertyPath) {
return ZkConfigManager.CONFIGS_ZKNODE + "/" + configName + "/" + propertyPath;
}
private void createConfigSet(ZkNodeProps message) throws IOException {
String configSetName = getTaskKey(message);
if (configSetName == null || configSetName.length() == 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "ConfigSet name not specified");
}
String baseConfigSetName = message.getStr(BASE_CONFIGSET);
if (baseConfigSetName == null || baseConfigSetName.length() == 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Base ConfigSet name not specified");
}
ZkConfigManager configManager = new ZkConfigManager(zkStateReader.getZkClient());
if (configManager.configExists(configSetName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "ConfigSet already exists: " + configSetName);
}
// is there a base config that already exists
if (!configManager.configExists(baseConfigSetName)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Base ConfigSet does not exist: " + baseConfigSetName);
}
String propertyPath = ConfigSetProperties.DEFAULT_FILENAME;
Map<String, Object> props = getNewProperties(message);
if (props != null) {
// read the old config properties and do a merge, if necessary
NamedList oldProps = getConfigSetProperties(getPropertyPath(baseConfigSetName,propertyPath));
if (oldProps != null) {
mergeOldProperties(props, oldProps);
}
}
byte[] propertyData = getPropertyData(props);
Set<String> copiedToZkPaths = new HashSet<String>();
try {
configManager.copyConfigDir(baseConfigSetName, configSetName, copiedToZkPaths);
if (propertyData != null) {
try {
zkStateReader.getZkClient().makePath(
getPropertyPath(configSetName, propertyPath),
propertyData, CreateMode.PERSISTENT, null, false, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error writing new properties",
SolrZkClient.checkInterrupted(e));
}
}
} catch (Exception e) {
// copying the config dir or writing the properties file may have failed.
// we should delete the ConfigSet because it may be invalid,
// assuming we actually wrote something. E.g. could be
// the entire baseConfig set with the old properties, including immutable,
// that would make it impossible for the user to delete.
try {
if (configManager.configExists(configSetName) && copiedToZkPaths.size() > 0) {
deleteConfigSet(configSetName, true);
}
} catch (IOException ioe) {
log.error("Error while trying to delete partially created ConfigSet", ioe);
}
throw e;
}
}
private void deleteConfigSet(ZkNodeProps message) throws IOException {
String configSetName = getTaskKey(message);
if (configSetName == null || configSetName.length() == 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "ConfigSet name not specified");
}
deleteConfigSet(configSetName, false);
}
private void deleteConfigSet(String configSetName, boolean force) throws IOException {
ZkConfigManager configManager = new ZkConfigManager(zkStateReader.getZkClient());
if (!configManager.configExists(configSetName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "ConfigSet does not exist to delete: " + configSetName);
}
String propertyPath = ConfigSetProperties.DEFAULT_FILENAME;
NamedList properties = getConfigSetProperties(getPropertyPath(configSetName, propertyPath));
if (properties != null) {
Object immutable = properties.get(ConfigSetProperties.IMMUTABLE_CONFIGSET_ARG);
boolean isImmutableConfigSet = immutable != null ? Boolean.parseBoolean(immutable.toString()) : false;
if (!force && isImmutableConfigSet) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Requested delete of immutable ConfigSet: " + configSetName);
}
}
configManager.deleteConfigDir(configSetName);
}
}

View File

@ -21,7 +21,7 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.cloud.ZkNodeProps;
/**
* Interface for processing messages received by an {@link OverseerProcessor}
* Interface for processing messages received by an {@link OverseerTaskProcessor}
*/
public interface OverseerMessageHandler {
@ -61,8 +61,9 @@ public interface OverseerMessageHandler {
/**
* @param taskKey the key associated with the task
* @param operation the operation being processed
* @param message the message being processed
*/
void unmarkExclusiveTask(String taskKey, String operation);
void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
/**
* @param taskKey the key associated with the task

View File

@ -61,10 +61,10 @@ public class OverseerNodePrioritizer {
List overseerDesignates = (List) m.get("overseer");
if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
String ldr = OverseerProcessor.getLeaderNode(zk);
String ldr = OverseerTaskProcessor.getLeaderNode(zk);
if(overseerDesignates.contains(ldr)) return;
log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
if(electionNodes.size()<2) return;
log.info("sorted nodes {}", electionNodes);
@ -89,7 +89,7 @@ public class OverseerNodePrioritizer {
//now ask the current leader to QUIT , so that the designate can takeover
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
"id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
"id", OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
}

View File

@ -29,7 +29,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
@ -56,16 +56,16 @@ import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
* {@link OverseerMessageHandler} handles specific messages in the
* queue.
*/
public class OverseerProcessor implements Runnable, Closeable {
public class OverseerTaskProcessor implements Runnable, Closeable {
public int maxParallelThreads = 10;
public ExecutorService tpe ;
private static Logger log = LoggerFactory
.getLogger(OverseerProcessor.class);
.getLogger(OverseerTaskProcessor.class);
private OverseerCollectionQueue workQueue;
private OverseerTaskQueue workQueue;
private DistributedMap runningMap;
private DistributedMap completedMap;
private DistributedMap failureMap;
@ -98,13 +98,13 @@ public class OverseerProcessor implements Runnable, Closeable {
private OverseerNodePrioritizer prioritizer;
public OverseerProcessor(ZkStateReader zkStateReader, String myId,
public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
final ShardHandlerFactory shardHandlerFactory,
String adminPath,
Overseer.Stats stats,
OverseerMessageHandlerSelector selector,
OverseerNodePrioritizer prioritizer,
OverseerCollectionQueue workQueue,
OverseerTaskQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
@ -451,7 +451,7 @@ public class OverseerProcessor implements Runnable, Closeable {
log.debug("Completed task:[{}]", head.getId());
}
markTaskComplete(messageHandler, head.getId(), asyncId, taskKey);
markTaskComplete(messageHandler, head.getId(), asyncId, taskKey, message);
log.debug("Marked task [{}] as completed.", head.getId());
printTrackingMaps();
@ -462,13 +462,13 @@ public class OverseerProcessor implements Runnable, Closeable {
SolrException.log(log, "", e);
} catch (InterruptedException e) {
// Reset task from tracking data structures so that it can be retried.
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
log.warn("Resetting task {} as the thread was interrupted.", head.getId());
Thread.currentThread().interrupt();
} finally {
if(!success) {
// Reset task from tracking data structures so that it can be retried.
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
}
synchronized (waitLock){
waitLock.notifyAll();
@ -476,7 +476,7 @@ public class OverseerProcessor implements Runnable, Closeable {
}
}
private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey)
private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message)
throws KeeperException, InterruptedException {
synchronized (completedTasks) {
completedTasks.put(id, head);
@ -489,10 +489,10 @@ public class OverseerProcessor implements Runnable, Closeable {
if(asyncId != null)
runningMap.remove(asyncId);
messageHandler.unmarkExclusiveTask(taskKey, operation);
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
}
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey) {
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
try {
if (asyncId != null)
@ -502,7 +502,7 @@ public class OverseerProcessor implements Runnable, Closeable {
runningTasks.remove(id);
}
messageHandler.unmarkExclusiveTask(taskKey, operation);
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
} catch (KeeperException e) {
SolrException.log(log, "", e);
} catch (InterruptedException e) {

View File

@ -33,20 +33,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link DistributedQueue} augmented with helper methods specific to the collection queue.
* A {@link DistributedQueue} augmented with helper methods specific to the overseer task queues.
* Methods specific to this subclass ignore superclass internal state and hit ZK directly.
* This is inefficient! But the API on this class is kind of muddy..
*/
public class OverseerCollectionQueue extends DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(OverseerCollectionQueue.class);
public class OverseerTaskQueue extends DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(OverseerTaskQueue.class);
private final String response_prefix = "qnr-" ;
public OverseerCollectionQueue(SolrZkClient zookeeper, String dir) {
public OverseerTaskQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
public OverseerCollectionQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
public OverseerTaskQueue(SolrZkClient zookeeper, String dir, Overseer.Stats stats) {
super(zookeeper, dir, stats);
}

View File

@ -116,7 +116,8 @@ public final class ZkController {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerJobQueue;
private final OverseerCollectionQueue overseerCollectionQueue;
private final OverseerTaskQueue overseerCollectionQueue;
private final OverseerTaskQueue overseerConfigSetQueue;
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
@ -376,6 +377,7 @@ public final class ZkController {
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
this.overseerConfigSetQueue = Overseer.getConfigSetQueue(zkClient);
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
@ -1768,10 +1770,14 @@ public final class ZkController {
return overseerJobQueue;
}
public OverseerCollectionQueue getOverseerCollectionQueue() {
public OverseerTaskQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
public OverseerTaskQueue getOverseerConfigSetQueue() {
return overseerConfigSetQueue;
}
public DistributedMap getOverseerRunningMap() {
return overseerRunningMap;
}

View File

@ -36,6 +36,9 @@ public class ConfigSetProperties {
private static final Logger log = LoggerFactory.getLogger(ConfigSetProperties.class);
public static final String DEFAULT_FILENAME = "configsetprops.json";
public static final String IMMUTABLE_CONFIGSET_ARG = "immutable";
/**
* Return the properties associated with the ConfigSet (e.g. immutable)
*
@ -54,10 +57,19 @@ public class ConfigSetProperties {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to load reader for ConfigSet properties: " + name, ex);
}
try {
return readFromInputStream(reader);
} finally {
IOUtils.closeQuietly(reader);
}
}
public static NamedList readFromInputStream(InputStreamReader reader) {
try {
JSONParser jsonParser = new JSONParser(reader);
Object object = ObjectBuilder.getVal(jsonParser);
if (!(object instanceof Map)) {
final String objectClass = object == null ? "null" : object.getClass().getName();
throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid JSON type " + object.getClass().getName() + ", expected Map");
}
return new NamedList((Map)object);

View File

@ -46,6 +46,7 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.InfoHandler;
import org.apache.solr.handler.admin.SecurityConfHandler;
@ -97,6 +98,7 @@ public class CoreContainer {
protected CoreAdminHandler coreAdminHandler = null;
protected CollectionsHandler collectionsHandler = null;
private InfoHandler infoHandler;
protected ConfigSetsHandler configSetsHandler = null;
private PKIAuthenticationPlugin pkiAuthenticationPlugin;
@ -129,6 +131,7 @@ public class CoreContainer {
public static final String CORES_HANDLER_PATH = "/admin/cores";
public static final String COLLECTIONS_HANDLER_PATH = "/admin/collections";
public static final String INFO_HANDLER_PATH = "/admin/info";
public static final String CONFIGSETS_HANDLER_PATH = "/admin/configs";
private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
@ -407,6 +410,8 @@ public class CoreContainer {
containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
coreAdminHandler = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);
configSetsHandler = createHandler(cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
containerHandlers.put(CONFIGSETS_HANDLER_PATH, configSetsHandler);
containerHandlers.put("/admin/authorization", securityConfHandler);
containerHandlers.put("/admin/authentication", securityConfHandler);
if(pkiAuthenticationPlugin != null)
@ -1040,6 +1045,10 @@ public class CoreContainer {
return infoHandler;
}
public ConfigSetsHandler getConfigSetsHandler() {
return configSetsHandler;
}
public String getHostName() {
return this.hostName;
}

View File

@ -84,7 +84,7 @@ public class CoreDescriptor {
private static ImmutableMap<String, String> defaultProperties = new ImmutableMap.Builder<String, String>()
.put(CORE_CONFIG, "solrconfig.xml")
.put(CORE_SCHEMA, "schema.xml")
.put(CORE_CONFIGSET_PROPERTIES, "configsetprops.json")
.put(CORE_CONFIGSET_PROPERTIES, ConfigSetProperties.DEFAULT_FILENAME)
.put(CORE_DATADIR, "data" + File.separator)
.put(CORE_TRANSIENT, "false")
.put(CORE_LOADONSTARTUP, "true")

View File

@ -44,6 +44,8 @@ public class NodeConfig {
private final String infoHandlerClass;
private final String configSetsHandlerClass;
private final LogWatcherConfig logWatcherConfig;
private final CloudConfig cloudConfig;
@ -58,7 +60,8 @@ public class NodeConfig {
private NodeConfig(String nodeName, String coreRootDirectory, String configSetBaseDirectory, String sharedLibDirectory,
PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig,
String coreAdminHandlerClass, String collectionsAdminHandlerClass, String infoHandlerClass,
String coreAdminHandlerClass, String collectionsAdminHandlerClass,
String infoHandlerClass, String configSetsHandlerClass,
LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, int coreLoadThreads,
int transientCacheSize, boolean useSchemaCache, String managementPath,
SolrResourceLoader loader, Properties solrProperties) {
@ -71,6 +74,7 @@ public class NodeConfig {
this.coreAdminHandlerClass = coreAdminHandlerClass;
this.collectionsAdminHandlerClass = collectionsAdminHandlerClass;
this.infoHandlerClass = infoHandlerClass;
this.configSetsHandlerClass = configSetsHandlerClass;
this.logWatcherConfig = logWatcherConfig;
this.cloudConfig = cloudConfig;
this.coreLoadThreads = coreLoadThreads;
@ -142,6 +146,10 @@ public class NodeConfig {
return infoHandlerClass;
}
public String getConfigSetsHandlerClass() {
return configSetsHandlerClass;
}
public boolean hasSchemaCache() {
return useSchemaCache;
}
@ -187,6 +195,7 @@ public class NodeConfig {
private String coreAdminHandlerClass = DEFAULT_ADMINHANDLERCLASS;
private String collectionsAdminHandlerClass = DEFAULT_COLLECTIONSHANDLERCLASS;
private String infoHandlerClass = DEFAULT_INFOHANDLERCLASS;
private String configSetsHandlerClass = DEFAULT_CONFIGSETSHANDLERCLASS;
private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50);
private CloudConfig cloudConfig;
private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
@ -205,6 +214,7 @@ public class NodeConfig {
private static final String DEFAULT_ADMINHANDLERCLASS = "org.apache.solr.handler.admin.CoreAdminHandler";
private static final String DEFAULT_INFOHANDLERCLASS = "org.apache.solr.handler.admin.InfoHandler";
private static final String DEFAULT_COLLECTIONSHANDLERCLASS = "org.apache.solr.handler.admin.CollectionsHandler";
private static final String DEFAULT_CONFIGSETSHANDLERCLASS = "org.apache.solr.handler.admin.ConfigSetsHandler";
public NodeConfigBuilder(String nodeName, SolrResourceLoader loader) {
this.nodeName = nodeName;
@ -252,6 +262,11 @@ public class NodeConfig {
return this;
}
public NodeConfigBuilder setConfigSetsHandlerClass(String configSetsHandlerClass) {
this.configSetsHandlerClass = configSetsHandlerClass;
return this;
}
public NodeConfigBuilder setLogWatcherConfig(LogWatcherConfig logWatcherConfig) {
this.logWatcherConfig = logWatcherConfig;
return this;
@ -289,7 +304,7 @@ public class NodeConfig {
public NodeConfig build() {
return new NodeConfig(nodeName, coreRootDirectory, configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, infoHandlerClass,
updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, infoHandlerClass, configSetsHandlerClass,
logWatcherConfig, cloudConfig, coreLoadThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties);
}
}

View File

@ -234,6 +234,9 @@ public class SolrXmlConfig {
case "infoHandler":
builder.setInfoHandlerClass(value);
break;
case "configSetsHandler":
builder.setConfigSetsHandlerClass(value);
break;
case "coreRootDirectory":
builder.setCoreRootDirectory(value);
break;

View File

@ -41,11 +41,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.JSON;
import static org.apache.solr.core.ConfigSetProperties.IMMUTABLE_CONFIGSET_ARG;
public class SchemaHandler extends RequestHandlerBase {
private static final Logger log = LoggerFactory.getLogger(SchemaHandler.class);
public static final String IMMUTABLE_CONFIGSET_ARG = "immutable";
private boolean isImmutableConfigSet = false;
@Override

View File

@ -82,6 +82,7 @@ import static org.apache.solr.common.params.CoreAdminParams.NAME;
import static org.apache.solr.common.util.StrUtils.formatString;
import static org.apache.solr.core.ConfigOverlay.NOT_EDITABLE;
import static org.apache.solr.core.ConfigOverlay.ZNODEVER;
import static org.apache.solr.core.ConfigSetProperties.IMMUTABLE_CONFIGSET_ARG;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_CLASS;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME_IN_OVERLAY;
@ -91,7 +92,6 @@ public class SolrConfigHandler extends RequestHandlerBase {
public static final Logger log = LoggerFactory.getLogger(SolrConfigHandler.class);
public static final String CONFIGSET_EDITING_DISABLED_ARG = "disable.configEdit";
public static final boolean configEditing_disabled = Boolean.getBoolean(CONFIGSET_EDITING_DISABLED_ARG);
public static final String IMMUTABLE_CONFIGSET_ARG = "immutable";
private static final Map<String, SolrConfig.SolrPluginInfo> namedPlugins;
private Lock reloadLock = new ReentrantLock(true);
private boolean isImmutableConfigSet = false;

View File

@ -36,8 +36,8 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedMap;
import org.apache.solr.cloud.OverseerCollectionQueue;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.overseer.SliceMutator;
@ -246,13 +246,13 @@ public class CollectionsHandler extends RequestHandlerBase {
+ event.getWatchedEvent().getType() + "]");
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection unkown case");
+ " the collection unknown case");
}
}
}
private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
OverseerCollectionQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
OverseerTaskQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
return collectionQueue.containsTaskWithRequestId(ASYNC, asyncId);
}

View File

@ -0,0 +1,190 @@
package org.apache.solr.handler.admin;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.ConfigSetParams;
import org.apache.solr.common.params.ConfigSetParams.ConfigSetAction;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.KeeperException;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.BASE_CONFIGSET;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.PROPERTY_PREFIX;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
/**
* A {@link org.apache.solr.request.SolrRequestHandler} for ConfigSets API requests.
*/
public class ConfigSetsHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(ConfigSetsHandler.class);
protected final CoreContainer coreContainer;
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
/**
* Overloaded ctor to inject CoreContainer into the handler.
*
* @param coreContainer Core Container of the solr webapp installed.
*/
public ConfigSetsHandler(final CoreContainer coreContainer) {
this.coreContainer = coreContainer;
}
@Override
final public void init(NamedList args) {
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
if (coreContainer == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Core container instance missing");
}
// Make sure that the core is ZKAware
if(!coreContainer.isZooKeeperAware()) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Solr instance is not running in SolrCloud mode.");
}
// Pick the action
SolrParams params = req.getParams();
String a = params.get(ConfigSetParams.ACTION);
if (a != null) {
ConfigSetAction action = ConfigSetAction.get(a);
if (action == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
ConfigSetOperation operation = ConfigSetOperation.get(action);
log.info("Invoked ConfigSet Action :{} with params {} ", action.toLower(), req.getParamString());
Map<String, Object> result = operation.call(req, rsp, this);
if (result != null) {
// We need to differentiate between collection and configsets actions since they currently
// use the same underlying queue.
result.put(QUEUE_OPERATION, CONFIGSETS_ACTION_PREFIX + operation.action.toLower());
ZkNodeProps props = new ZkNodeProps(result);
handleResponse(operation.action.toLower(), props, rsp, DEFAULT_ZK_TIMEOUT);
}
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
}
rsp.setHttpCaching(false);
}
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
QueueEvent event = coreContainer.getZkController()
.getOverseerConfigSetQueue()
.offer(Utils.toJSON(m), timeout);
if (event.getBytes() != null) {
SolrResponse response = SolrResponse.deserialize(event.getBytes());
rsp.getValues().addAll(response.getResponse());
SimpleOrderedMap exp = (SimpleOrderedMap) response.getResponse().get("exception");
if (exp != null) {
Integer code = (Integer) exp.get("rspCode");
rsp.setException(new SolrException(code != null && code != -1 ? ErrorCode.getErrorCode(code) : ErrorCode.SERVER_ERROR, (String)exp.get("msg")));
}
} else {
if (System.nanoTime() - time >= TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the configset time out:" + timeout / 1000 + "s");
} else if (event.getWatchedEvent() != null) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the configset error [Watcher fired on path: "
+ event.getWatchedEvent().getPath() + " state: "
+ event.getWatchedEvent().getState() + " type "
+ event.getWatchedEvent().getType() + "]");
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the configset unknown case");
}
}
}
private static Map<String, Object> copyPropertiesWithPrefix(SolrParams params, Map<String, Object> props, String prefix) {
Iterator<String> iter = params.getParameterNamesIterator();
while (iter.hasNext()) {
String param = iter.next();
if (param.startsWith(prefix)) {
props.put(param, params.get(param));
}
}
return props;
}
@Override
public String getDescription() {
return "Manage SolrCloud ConfigSets";
}
enum ConfigSetOperation {
CREATE_OP(CREATE) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, ConfigSetsHandler h) throws Exception {
Map<String, Object> props = req.getParams().required().getAll(null, NAME, BASE_CONFIGSET);
return copyPropertiesWithPrefix(req.getParams(), props, PROPERTY_PREFIX + ".");
}
},
DELETE_OP(DELETE) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, ConfigSetsHandler h) throws Exception {
return req.getParams().required().getAll(null, NAME);
}
};
ConfigSetAction action;
ConfigSetOperation(ConfigSetAction action) {
this.action = action;
}
abstract Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, ConfigSetsHandler h) throws Exception;
public static ConfigSetOperation get(ConfigSetAction action) {
for (ConfigSetOperation op : values()) {
if (op.action == action) return op;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
}
}
}

View File

@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.OverseerProcessor;
import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -160,7 +160,7 @@ class RebalanceLeaders {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
@ -193,7 +193,7 @@ class RebalanceLeaders {
throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// First, queue up the preferred leader at the head of the queue.
@ -210,12 +210,12 @@ class RebalanceLeaders {
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
}
List<String> electionNodesTmp = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
List<String> electionNodesTmp = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String thisNode : electionNodes) {
@ -238,7 +238,7 @@ class RebalanceLeaders {
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String testNode : electionNodes) {
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {

View File

@ -21,6 +21,7 @@
<int name="coreLoadThreads">11</int>
<str name="coreRootDirectory">${coreRootDirectory:testCoreRootDirectory}</str>
<str name="infoHandler">testInfoHandler</str>
<str name="configSetsHandler">testConfigSetsHandler</str>
<str name="managementPath">testManagementPath</str>
<str name="sharedLib">testSharedLib</str>
<str name="shareSchema">${shareSchema:true}</str>

View File

@ -20,7 +20,7 @@ package org.apache.solr.cloud;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
@ -73,13 +73,13 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
private static final String ADMIN_PATH = "/admin/cores";
private static final String COLLECTION_NAME = "mycollection";
private static final String CONFIG_NAME = "myconfig";
private static OverseerCollectionQueue workQueueMock;
private static OverseerTaskQueue workQueueMock;
private static DistributedMap runningMapMock;
private static DistributedMap completedMapMock;
private static DistributedMap failureMapMock;
@ -93,19 +93,19 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private SolrResponse lastProcessMessageResult;
private OverseerCollectionProcessorToBeTested underTest;
private OverseerCollectionConfigSetProcessorToBeTested underTest;
private Thread thread;
private Queue<QueueEvent> queue = new ArrayBlockingQueue<>(10);
private class OverseerCollectionProcessorToBeTested extends
OverseerCollectionProcessor {
private class OverseerCollectionConfigSetProcessorToBeTested extends
OverseerCollectionConfigSetProcessor {
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
public OverseerCollectionConfigSetProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandlerFactory shardHandlerFactory,
String adminPath,
OverseerCollectionQueue workQueue, DistributedMap runningMap,
OverseerTaskQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
@ -120,7 +120,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
@BeforeClass
public static void setUpOnce() throws Exception {
workQueueMock = createMock(OverseerCollectionQueue.class);
workQueueMock = createMock(OverseerTaskQueue.class);
runningMapMock = createMock(DistributedMap.class);
completedMapMock = createMock(DistributedMap.class);
failureMapMock = createMock(DistributedMap.class);
@ -158,7 +158,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
reset(zkStateReaderMock);
reset(clusterStateMock);
reset(solrZkClientMock);
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
underTest = new OverseerCollectionConfigSetProcessorToBeTested(zkStateReaderMock,
"1234", shardHandlerFactoryMock, ADMIN_PATH, workQueueMock, runningMapMock,
completedMapMock, failureMapMock);
zkMap.clear();

View File

@ -43,8 +43,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getLeaderNode;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getSortedOverseerNodeNames;
import static org.apache.solr.cloud.OverseerCollectionConfigSetProcessor.getLeaderNode;
import static org.apache.solr.cloud.OverseerCollectionConfigSetProcessor.getSortedOverseerNodeNames;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@ -88,7 +88,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS);
String newLeader=null;
for(;! timeout.hasTimedOut();){
newLeader = OverseerCollectionProcessor.getLeaderNode(zk);
newLeader = OverseerCollectionConfigSetProcessor.getLeaderNode(zk);
if(newLeader!=null && !newLeader.equals(leader)) break;
Thread.sleep(100);
}
@ -96,7 +96,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
assertTrue("The old leader should have rejoined election ", OverseerCollectionProcessor.getSortedOverseerNodeNames(zk).contains(leader));
assertTrue("The old leader should have rejoined election ", OverseerCollectionConfigSetProcessor.getSortedOverseerNodeNames(zk).contains(leader));
}
@ -108,10 +108,10 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
createCollection(collectionName, client);
waitForRecoveriesToFinish(collectionName, false);
List<String> l = OverseerCollectionProcessor.getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) ;
List<String> l = OverseerCollectionConfigSetProcessor.getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) ;
log.info("All nodes {}", l);
String currentLeader = OverseerCollectionProcessor.getLeaderNode(client.getZkStateReader().getZkClient());
String currentLeader = OverseerCollectionConfigSetProcessor.getLeaderNode(client.getZkStateReader().getZkClient());
log.info("Current leader {} ", currentLeader);
l.remove(currentLeader);
@ -124,7 +124,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
boolean leaderchanged = false;
for(;!timeout.hasTimedOut();){
if(overseerDesignate.equals(OverseerCollectionProcessor.getLeaderNode(client.getZkStateReader().getZkClient()))){
if(overseerDesignate.equals(OverseerCollectionConfigSetProcessor.getLeaderNode(client.getZkStateReader().getZkClient()))){
log.info("overseer designate is the new overseer");
leaderchanged =true;
break;
@ -134,7 +134,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
assertTrue("could not set the new overseer . expected "+
overseerDesignate + " current order : " +
getSortedOverseerNodeNames(client.getZkStateReader().getZkClient()) +
" ldr :"+ OverseerCollectionProcessor.getLeaderNode(client.getZkStateReader().getZkClient()) ,leaderchanged);
" ldr :"+ OverseerCollectionConfigSetProcessor.getLeaderNode(client.getZkStateReader().getZkClient()) ,leaderchanged);
@ -176,7 +176,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
log.info("leader node {}", leaderJetty.getBaseUrl());
log.info ("current election Queue",
OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient(),
OverseerCollectionConfigSetProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
ChaosMonkey.stop(leaderJetty);
timeout = new TimeOut(10, TimeUnit.SECONDS);

View File

@ -16,13 +16,13 @@ package org.apache.solr.cloud;
* the License.
*/
public class OverseerCollectionQueueTest extends DistributedQueueTest {
public class OverseerTaskQueueTest extends DistributedQueueTest {
// TODO: OverseerCollectionQueue specific tests.
// TODO: OverseerTaskQueue specific tests.
@Override
protected OverseerCollectionQueue makeDistributedQueue(String dqZNode) throws Exception {
return new OverseerCollectionQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
}

View File

@ -57,7 +57,7 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
public void restartWithRolesTest() throws Exception {
String leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
String leader = OverseerCollectionConfigSetProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
assertNotNull(leader);
log.info("Current overseer leader = {}", leader);
@ -93,10 +93,10 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
sawLiveDesignate = true;
boolean success = waitUntilOverseerDesignateIsLeader(cloudClient.getZkStateReader().getZkClient(), designates, MAX_WAIT_TIME);
if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
leader = OverseerCollectionConfigSetProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null)
log.error("NOOVERSEER election queue is :" +
OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader);
}
@ -104,10 +104,10 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
assertTrue("Unable to restart (#" + i + "): " + cloudJetty, ChaosMonkey.start(cloudJetty.jetty));
boolean success = waitUntilOverseerDesignateIsLeader(cloudClient.getZkStateReader().getZkClient(), designates, MAX_WAIT_TIME);
if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
leader = OverseerCollectionConfigSetProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null)
log.error("NOOVERSEER election queue is :" +
OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer leader found after restart #" + (i + 1) + ": " + leader);
}
@ -120,7 +120,7 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
assertTrue("Test may not be working if we never saw a live designate", sawLiveDesignate);
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
leader = OverseerCollectionConfigSetProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
assertNotNull(leader);
log.info("Current overseer leader (after restart) = {}", leader);
@ -135,7 +135,7 @@ public class RollingRestartTest extends AbstractFullDistribZkTestBase {
int stableCheckTimeout = 2000;
String oldleader = null;
while (System.nanoTime() < timeout && System.nanoTime() < maxTimeout) {
String newLeader = OverseerCollectionProcessor.getLeaderNode(testZkClient);
String newLeader = OverseerCollectionConfigSetProcessor.getLeaderNode(testZkClient);
if (newLeader != null && !newLeader.equals(oldleader)) {
// the leaders have changed, let's move the timeout further
timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);

View File

@ -33,7 +33,7 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
@Test
@ShardsFixed(num = 1)
public void test() throws Exception {
String overseerNode = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
String overseerNode = OverseerCollectionConfigSetProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
String notOverseerNode = null;
for (CloudJettyRunner cloudJetty : cloudJettys) {
if (!overseerNode.equals(cloudJetty.nodeName)) {

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest.Create;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest.Delete;
import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.params.ConfigSetParams;
import org.apache.solr.common.params.ConfigSetParams.ConfigSetAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ConfigSetProperties;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.BASE_CONFIGSET;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.core.ConfigSetProperties.DEFAULT_FILENAME;
/**
* Simple ConfigSets API tests on user errors and simple success cases.
*/
public class TestConfigSetsAPI extends SolrTestCaseJ4 {
private MiniSolrCloudCluster solrCluster;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
final File solrXml = getFile("solr").toPath().resolve("solr.xml").toFile();
solrCluster = new MiniSolrCloudCluster(1, createTempDir().toFile(), solrXml, buildJettyConfig("/solr"));
}
@Override
@After
public void tearDown() throws Exception {
solrCluster.shutdown();
super.tearDown();
}
@Test
public void testCreateErrors() throws Exception {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
solrCluster.uploadConfigDir(configDir, "configSet");
// no action
CreateNoErrorChecking createNoAction = new CreateNoErrorChecking();
createNoAction.setAction(null);
verifyException(solrClient, createNoAction, "action");
// no ConfigSet name
CreateNoErrorChecking create = new CreateNoErrorChecking();
verifyException(solrClient, create, NAME);
// no base ConfigSet name
create.setConfigSetName("configSetName");
verifyException(solrClient, create, BASE_CONFIGSET);
// ConfigSet already exists
Create alreadyExists = new Create();
alreadyExists.setConfigSetName("configSet").setBaseConfigSetName("baseConfigSet");
verifyException(solrClient, alreadyExists, "ConfigSet already exists");
// Base ConfigSet does not exist
Create baseConfigNoExists = new Create();
baseConfigNoExists.setConfigSetName("newConfigSet").setBaseConfigSetName("baseConfigSet");
verifyException(solrClient, baseConfigNoExists, "Base ConfigSet does not exist");
solrClient.close();
}
@Test
public void testCreate() throws Exception {
// no old, no new
verifyCreate("baseConfigSet1", "configSet1", null, null);
// no old, new
verifyCreate("baseConfigSet2", "configSet2",
null, ImmutableMap.<String, String>of("immutable", "true", "key1", "value1"));
// old, no new
verifyCreate("baseConfigSet3", "configSet3",
ImmutableMap.<String, String>of("immutable", "false", "key2", "value2"), null);
// old, new
verifyCreate("baseConfigSet4", "configSet4",
ImmutableMap.<String, String>of("immutable", "true", "onlyOld", "onlyOldValue"),
ImmutableMap.<String, String>of("immutable", "false", "onlyNew", "onlyNewValue"));
}
private void setupBaseConfigSet(String baseConfigSetName, Map<String, String> oldProps) throws Exception {
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
final File tmpConfigDir = createTempDir().toFile();
tmpConfigDir.deleteOnExit();
FileUtils.copyDirectory(configDir, tmpConfigDir);
if (oldProps != null) {
FileUtils.write(new File(tmpConfigDir, ConfigSetProperties.DEFAULT_FILENAME),
getConfigSetProps(oldProps));
}
solrCluster.uploadConfigDir(tmpConfigDir, baseConfigSetName);
}
private void verifyCreate(String baseConfigSetName, String configSetName,
Map<String, String> oldProps, Map<String, String> newProps) throws Exception {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
setupBaseConfigSet(baseConfigSetName, oldProps);
SolrZkClient zkClient = new SolrZkClient(solrCluster.getZkServer().getZkAddress(),
AbstractZkTestCase.TIMEOUT, 45000, null);
try {
ZkConfigManager configManager = new ZkConfigManager(zkClient);
assertFalse(configManager.configExists(configSetName));
Create create = new Create();
create.setBaseConfigSetName(baseConfigSetName).setConfigSetName(configSetName);
if (newProps != null) {
Properties p = new Properties();
p.putAll(newProps);
create.setNewConfigSetProperties(p);
}
ConfigSetAdminResponse response = create.process(solrClient);
assertNotNull(response.getResponse());
assertTrue(configManager.configExists(configSetName));
verifyProperties(configSetName, oldProps, newProps, zkClient);
} finally {
zkClient.close();
}
solrClient.close();
}
private NamedList getConfigSetPropertiesFromZk(
SolrZkClient zkClient, String path) throws Exception {
byte [] oldPropsData = null;
try {
oldPropsData = zkClient.getData(path, null, null, true);
} catch (KeeperException.NoNodeException e) {
// okay, properties just don't exist
}
if (oldPropsData != null) {
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(oldPropsData), StandardCharsets.UTF_8);
try {
return ConfigSetProperties.readFromInputStream(reader);
} finally {
reader.close();
}
}
return null;
}
private void verifyProperties(String configSetName, Map<String, String> oldProps,
Map<String, String> newProps, SolrZkClient zkClient) throws Exception {
NamedList properties = getConfigSetPropertiesFromZk(zkClient,
ZkConfigManager.CONFIGS_ZKNODE + "/" + configSetName + "/" + DEFAULT_FILENAME);
// let's check without merging the maps, since that's what the MessageHandler does
// (since we'd probably repeat any bug in the MessageHandler here)
if (oldProps == null && newProps == null) {
assertNull(properties);
return;
}
assertNotNull(properties);
// check all oldProps are in props
if (oldProps != null) {
for (Map.Entry<String, String> entry : oldProps.entrySet()) {
assertNotNull(properties.get(entry.getKey()));
}
}
// check all newProps are in props
if (newProps != null) {
for (Map.Entry<String, String> entry : newProps.entrySet()) {
assertNotNull(properties.get(entry.getKey()));
}
}
// check the value in properties are correct
Iterator<Map.Entry<String, Object>> it = properties.iterator();
while (it.hasNext()) {
Map.Entry<String, Object> entry = it.next();
String newValue = newProps != null ? newProps.get(entry.getKey()) : null;
String oldValue = oldProps != null ? oldProps.get(entry.getKey()) : null;
if (newValue != null) {
assertTrue(newValue.equals(entry.getValue()));
} else if (oldValue != null) {
assertTrue(oldValue.equals(entry.getValue()));
} else {
// not in either
assert(false);
}
}
}
@Test
public void testDeleteErrors() throws Exception {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
final File tmpConfigDir = createTempDir().toFile();
tmpConfigDir.deleteOnExit();
// Ensure ConfigSet is immutable
FileUtils.copyDirectory(configDir, tmpConfigDir);
FileUtils.write(new File(tmpConfigDir, "configsetprops.json"),
getConfigSetProps(ImmutableMap.<String, String>of("immutable", "true")));
solrCluster.uploadConfigDir(tmpConfigDir, "configSet");
// no ConfigSet name
DeleteNoErrorChecking delete = new DeleteNoErrorChecking();
verifyException(solrClient, delete, NAME);
// ConfigSet doesn't exist
delete.setConfigSetName("configSetBogus");
verifyException(solrClient, delete, "ConfigSet does not exist");
// ConfigSet is immutable
delete.setConfigSetName("configSet");
verifyException(solrClient, delete, "Requested delete of immutable ConfigSet");
solrClient.close();
}
private void verifyException(SolrClient solrClient, ConfigSetAdminRequest request,
String errorContains) throws Exception {
try {
solrClient.request(request);
Assert.fail("Expected exception");
} catch (Exception e) {
assertTrue("Expected exception message to contain: " + errorContains
+ " got: " + e.getMessage(), e.getMessage().contains(errorContains));
}
}
@Test
public void testDelete() throws Exception {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
final String configSet = "configSet";
solrCluster.uploadConfigDir(configDir, configSet);
SolrZkClient zkClient = new SolrZkClient(solrCluster.getZkServer().getZkAddress(),
AbstractZkTestCase.TIMEOUT, 45000, null);
try {
ZkConfigManager configManager = new ZkConfigManager(zkClient);
assertTrue(configManager.configExists(configSet));
Delete delete = new Delete();
delete.setConfigSetName(configSet);
ConfigSetAdminResponse response = delete.process(solrClient);
assertNotNull(response.getResponse());
assertFalse(configManager.configExists(configSet));
} finally {
zkClient.close();
}
solrClient.close();
}
private StringBuilder getConfigSetProps(Map<String, String> map) {
return new StringBuilder(new String(Utils.toJSON(map), StandardCharsets.UTF_8));
}
public static class CreateNoErrorChecking extends ConfigSetAdminRequest.Create {
public ConfigSetAdminRequest setAction(ConfigSetAction action) {
return super.setAction(action);
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams();
if (action != null) params.set(ConfigSetParams.ACTION, action.toString());
if (configSetName != null) params.set(NAME, configSetName);
if (baseConfigSetName != null) params.set("baseConfigSet", baseConfigSetName);
return params;
}
}
public static class DeleteNoErrorChecking extends ConfigSetAdminRequest.Delete {
public ConfigSetAdminRequest setAction(ConfigSetAction action) {
return super.setAction(action);
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams();
if (action != null) params.set(ConfigSetParams.ACTION, action.toString());
if (configSetName != null) params.set(NAME, configSetName);
return params;
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest.Create;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest.Delete;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the exclusivity of the ConfigSets API.
* Submits a number of API requests concurrently and checks that
* the responses indicate the requests are handled sequentially for
* the same ConfigSet and base ConfigSet.
*/
public class TestConfigSetsAPIExclusivity extends SolrTestCaseJ4 {
private static Logger log = LoggerFactory
.getLogger(TestConfigSetsAPIExclusivity.class);
private MiniSolrCloudCluster solrCluster;
private static final String GRANDBASE_CONFIGSET_NAME = "grandBaseConfigSet1";
private static final String BASE_CONFIGSET_NAME = "baseConfigSet1";
private static final String CONFIGSET_NAME = "configSet1";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
final File solrXml = getFile("solr").toPath().resolve("solr.xml").toFile();
final File testDir = createTempDir().toFile();
solrCluster = new MiniSolrCloudCluster(1, testDir,
solrXml, buildJettyConfig("/solr"));
}
@Override
@After
public void tearDown() throws Exception {
solrCluster.shutdown();
super.tearDown();
}
@Test
public void testAPIExclusivity() throws Exception {
int trials = 30;
setupBaseConfigSet(GRANDBASE_CONFIGSET_NAME);
CreateThread createBaseThread =
new CreateThread(solrCluster, BASE_CONFIGSET_NAME, GRANDBASE_CONFIGSET_NAME, trials);
CreateThread createThread =
new CreateThread(solrCluster, CONFIGSET_NAME, BASE_CONFIGSET_NAME, trials);
DeleteThread deleteBaseThread = new DeleteThread(solrCluster, BASE_CONFIGSET_NAME, trials);
DeleteThread deleteThread = new DeleteThread(solrCluster, CONFIGSET_NAME, trials);
List<ConfigSetsAPIThread> threads = Arrays.asList(
createBaseThread, createThread, deleteBaseThread, deleteThread);
for (ConfigSetsAPIThread thread : threads) {
thread.start();
}
for (ConfigSetsAPIThread thread : threads) {
thread.join();
}
List<Exception> exceptions = new LinkedList<Exception>();
for (ConfigSetsAPIThread thread : threads) {
exceptions.addAll(thread.getUnexpectedExceptions());
}
assertEquals("Unexpected exception: " + getFirstExceptionOrNull(exceptions),
0, exceptions.size());
}
private void setupBaseConfigSet(String baseConfigSetName) throws Exception {
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
final File tmpConfigDir = createTempDir().toFile();
tmpConfigDir.deleteOnExit();
FileUtils.copyDirectory(configDir, tmpConfigDir);
solrCluster.uploadConfigDir(tmpConfigDir, baseConfigSetName);
}
private Exception getFirstExceptionOrNull(List<Exception> list) {
return list.size() == 0 ? null : list.get(0);
}
private static abstract class ConfigSetsAPIThread extends Thread {
private MiniSolrCloudCluster solrCluster;
private int trials;
private List<Exception> unexpectedExceptions = new LinkedList<Exception>();
private List<String> allowedExceptions = Arrays.asList(new String[] {
"ConfigSet already exists",
"ConfigSet does not exist to delete",
"Base ConfigSet does not exist"});
public ConfigSetsAPIThread(MiniSolrCloudCluster solrCluster, int trials) {
this.solrCluster = solrCluster;
this.trials = trials;
}
public abstract ConfigSetAdminRequest createRequest();
public void run() {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
ConfigSetAdminRequest request = createRequest();
for (int i = 0; i < trials; ++i) {
try {
request.process(solrClient);
} catch (Exception e) {
verifyException(e);
}
}
try {
solrClient.close();
} catch (Exception e) {
log.error("Error closing client", e);
}
}
private void verifyException(Exception e) {
for (String ex : allowedExceptions) {
if (e.getMessage().contains(ex)) {
return;
}
}
unexpectedExceptions.add(e);
}
public List<Exception> getUnexpectedExceptions() {
return unexpectedExceptions;
}
}
private static class CreateThread extends ConfigSetsAPIThread {
private String configSet;
private String baseConfigSet;
public CreateThread(MiniSolrCloudCluster solrCluster, String configSet,
String baseConfigSet, int trials) {
super(solrCluster, trials);
this.configSet = configSet;
this.baseConfigSet = baseConfigSet;
}
@Override
public ConfigSetAdminRequest createRequest() {
Create create = new Create();
create.setBaseConfigSetName(baseConfigSet).setConfigSetName(configSet);
return create;
}
}
private static class DeleteThread extends ConfigSetsAPIThread {
private String configSet;
public DeleteThread(MiniSolrCloudCluster solrCluster, String configSet, int trials) {
super(solrCluster, trials);
this.configSet = configSet;
}
@Override
public ConfigSetAdminRequest createRequest() {
Delete delete = new Delete();
delete.setConfigSetName(configSet);
return delete;
}
}
}

View File

@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest.Create;
import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ConfigSetProperties;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.solr.common.cloud.ZkConfigManager.CONFIGS_ZKNODE;
/**
* Test the ConfigSets API under ZK failure. In particular,
* if create fails, ensure proper cleanup occurs so we aren't
* left with a partially created ConfigSet.
*/
public class TestConfigSetsAPIZkFailure extends SolrTestCaseJ4 {
private MiniSolrCloudCluster solrCluster;
private ZkTestServer zkTestServer;
private static final String BASE_CONFIGSET_NAME = "baseConfigSet1";
private static final String CONFIGSET_NAME = "configSet1";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
final File solrXml = getFile("solr").toPath().resolve("solr.xml").toFile();
final File testDir = createTempDir().toFile();
String zkDir = testDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
zkTestServer = new ZkTestServer(zkDir);
zkTestServer.run();
zkTestServer.setZKDatabase(
new FailureDuringCopyZKDatabase(zkTestServer.getZKDatabase(), zkTestServer));
solrCluster = new MiniSolrCloudCluster(1, testDir,
solrXml, buildJettyConfig("/solr"), zkTestServer);
}
@Override
@After
public void tearDown() throws Exception {
solrCluster.shutdown();
zkTestServer.shutdown();
super.tearDown();
}
@Test
public void testCreateZkFailure() throws Exception {
final SolrClient solrClient =
new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
final Map<String, String> oldProps = ImmutableMap.of("immutable", "true");
setupBaseConfigSet(BASE_CONFIGSET_NAME, oldProps);
SolrZkClient zkClient = new SolrZkClient(solrCluster.getZkServer().getZkAddress(),
AbstractZkTestCase.TIMEOUT, 45000, null);
try {
ZkConfigManager configManager = new ZkConfigManager(zkClient);
assertFalse(configManager.configExists(CONFIGSET_NAME));
Create create = new Create();
create.setBaseConfigSetName(BASE_CONFIGSET_NAME).setConfigSetName(CONFIGSET_NAME);
try {
ConfigSetAdminResponse response = create.process(solrClient);
Assert.fail("Expected solr exception");
} catch (RemoteSolrException se) {
// partial creation should have been cleaned up
assertFalse(configManager.configExists(CONFIGSET_NAME));
assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, se.code());
}
} finally {
zkClient.close();
}
solrClient.close();
}
private void setupBaseConfigSet(String baseConfigSetName, Map<String, String> oldProps) throws Exception {
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
final File tmpConfigDir = createTempDir().toFile();
tmpConfigDir.deleteOnExit();
FileUtils.copyDirectory(configDir, tmpConfigDir);
if (oldProps != null) {
FileUtils.write(new File(tmpConfigDir, ConfigSetProperties.DEFAULT_FILENAME),
getConfigSetProps(oldProps));
}
solrCluster.uploadConfigDir(tmpConfigDir, baseConfigSetName);
}
private StringBuilder getConfigSetProps(Map<String, String> map) {
return new StringBuilder(new String(Utils.toJSON(map), StandardCharsets.UTF_8));
}
private static class FailureDuringCopyZKDatabase extends ForwardingZKDatabase {
private final ZkTestServer zkTestServer;
public FailureDuringCopyZKDatabase(ZKDatabase zkdb, ZkTestServer zkTestServer) {
super(zkdb);
this.zkTestServer = zkTestServer;
}
@Override
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
// we know we are doing a copy when we are getting data from the base config set and
// the new config set (partially) exists
String zkAddress = zkTestServer.getZkAddress();
String chroot = zkAddress.substring(zkAddress.lastIndexOf("/"));
if (path.startsWith(chroot + CONFIGS_ZKNODE + "/" + BASE_CONFIGSET_NAME)
&& !path.contains(ConfigSetProperties.DEFAULT_FILENAME)) {
List<String> children = null;
try {
children = getChildren(chroot + CONFIGS_ZKNODE + "/" + CONFIGSET_NAME, null, null);
} catch (KeeperException.NoNodeException e) {}
if (children != null && children.size() > 0) {
throw new RuntimeException("sample zookeeper error");
}
}
return super.getData(path, stat, watcher);
}
}
private static class ForwardingZKDatabase extends ZKDatabase {
private ZKDatabase zkdb;
public ForwardingZKDatabase(ZKDatabase zkdb) {
super(null);
this.zkdb = zkdb;
}
@Override
public boolean isInitialized() {
return zkdb.isInitialized();
}
@Override
public void clear() {
zkdb.clear();
}
@Override
public DataTree getDataTree() {
return zkdb.getDataTree();
}
@Override
public long getmaxCommittedLog() {
return zkdb.getmaxCommittedLog();
}
@Override
public long getminCommittedLog() {
return zkdb.getminCommittedLog();
}
@Override
public ReentrantReadWriteLock getLogLock() {
return zkdb.getLogLock();
}
@Override
public synchronized LinkedList<Proposal> getCommittedLog() {
return zkdb.getCommittedLog();
}
@Override
public long getDataTreeLastProcessedZxid() {
return zkdb.getDataTreeLastProcessedZxid();
}
@Override
public void setDataTreeInit(boolean b) {
zkdb.setDataTreeInit(b);
}
@Override
public Collection<Long> getSessions() {
return zkdb.getSessions();
}
@Override
public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
return zkdb.getSessionWithTimeOuts();
}
@Override
public long loadDataBase() throws IOException {
return zkdb.loadDataBase();
}
@Override
public void addCommittedProposal(Request request) {
zkdb.addCommittedProposal(request);
}
@Override
public void removeCnxn(ServerCnxn cnxn) {
zkdb.removeCnxn(cnxn);
}
@Override
public void killSession(long sessionId, long zxid) {
zkdb.killSession(sessionId, zxid);
}
@Override
public void dumpEphemerals(PrintWriter pwriter) {
zkdb.dumpEphemerals(pwriter);
}
@Override
public int getNodeCount() {
return zkdb.getNodeCount();
}
@Override
public HashSet<String> getEphemerals(long sessionId) {
return zkdb.getEphemerals(sessionId);
}
@Override
public void setlastProcessedZxid(long zxid) {
zkdb.setlastProcessedZxid(zxid);
}
@Override
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return zkdb.processTxn(hdr, txn);
}
@Override
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
return zkdb.statNode(path, serverCnxn);
}
@Override
public DataNode getNode(String path) {
return zkdb.getNode(path);
}
@Override
public List<ACL> convertLong(Long aclL) {
return zkdb.convertLong(aclL);
}
@Override
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
return zkdb.getData(path, stat, watcher);
}
@Override
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches, Watcher watcher) {
zkdb.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
}
@Override
public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
return zkdb.getACL(path, stat);
}
@Override
public List<String> getChildren(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
return zkdb.getChildren(path, stat, watcher);
}
@Override
public boolean isSpecialPath(String path) {
return zkdb.isSpecialPath(path);
}
@Override
public int getAclSize() {
return zkdb.getAclSize();
}
@Override
public boolean truncateLog(long zxid) throws IOException {
return zkdb.truncateLog(zxid);
}
@Override
public void deserializeSnapshot(InputArchive ia) throws IOException {
zkdb.deserializeSnapshot(ia);
}
@Override
public void serializeSnapshot(OutputArchive oa) throws IOException,
InterruptedException {
zkdb.serializeSnapshot(oa);
}
@Override
public boolean append(Request si) throws IOException {
return zkdb.append(si);
}
@Override
public void rollLog() throws IOException {
zkdb.rollLog();
}
@Override
public void commit() throws IOException {
zkdb.commit();
}
@Override
public void close() throws IOException {
zkdb.close();
}
}
}

View File

@ -78,7 +78,7 @@ public class TestLeaderElectionZkExpiry extends SolrTestCaseJ4 {
boolean found = false;
while (System.nanoTime() < timeout) {
try {
String leaderNode = OverseerCollectionProcessor.getLeaderNode(zc);
String leaderNode = OverseerCollectionConfigSetProcessor.getLeaderNode(zc);
if (leaderNode != null && !leaderNode.trim().isEmpty()) {
log.info("Time={} Overseer leader is = {}", System.nanoTime(), leaderNode);
found = true;

View File

@ -182,7 +182,7 @@ public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
List<String> getOverseerSort(String key) {
List<String> ret = null;
try {
ret = OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
ret = OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
"/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
return ret;
} catch (KeeperException e) {

View File

@ -24,6 +24,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.InfoHandler;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -308,6 +309,7 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
" <str name=\"collectionsHandler\">" + CustomCollectionsHandler.class.getName() + "</str>" +
" <str name=\"infoHandler\">" + CustomInfoHandler.class.getName() + "</str>" +
" <str name=\"adminHandler\">" + CustomCoreAdminHandler.class.getName() + "</str>" +
" <str name=\"configSetsHandler\">" + CustomConfigSetsHandler.class.getName() + "</str>" +
"</solr>";
public static class CustomCollectionsHandler extends CollectionsHandler {
@ -328,6 +330,12 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
}
}
public static class CustomConfigSetsHandler extends ConfigSetsHandler {
public CustomConfigSetsHandler(CoreContainer cc) {
super(cc);
}
}
@Test
public void testCustomHandlers() throws Exception {

View File

@ -72,6 +72,7 @@ public class TestSolrXml extends SolrTestCaseJ4 {
assertEquals("core admin handler class", "testAdminHandler", cfg.getCoreAdminHandlerClass());
assertEquals("collection handler class", "testCollectionsHandler", cfg.getCollectionsHandlerClass());
assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass());
assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass());
assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount());
assertThat("core root dir", cfg.getCoreRootDirectory(), containsString("testCoreRootDirectory"));
assertEquals("distrib conn timeout", 22, cfg.getDistributedConnectionTimeout());

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.common.params.ConfigSetParams;
import org.apache.solr.common.params.ConfigSetParams.ConfigSetAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
* This class is experimental and subject to change.
*
* @since solr 5.4
*/
public abstract class ConfigSetAdminRequest <Q extends ConfigSetAdminRequest<Q>> extends SolrRequest<ConfigSetAdminResponse> {
protected ConfigSetAction action = null;
protected String configSetName = null;
protected ConfigSetAdminRequest setAction(ConfigSetAction action) {
this.action = action;
return this;
}
public ConfigSetAdminRequest() {
super(METHOD.GET, "/admin/configs");
}
public ConfigSetAdminRequest(String path) {
super (METHOD.GET, path);
}
protected abstract Q getThis();
@Override
public SolrParams getParams() {
if (action == null) {
throw new RuntimeException( "no action specified!" );
}
if (configSetName == null) {
throw new RuntimeException( "no ConfigSet specified!" );
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(ConfigSetParams.ACTION, action.toString());
params.set(NAME, configSetName);
return params;
}
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
return null;
}
@Override
protected ConfigSetAdminResponse createResponse(SolrClient client) {
return new ConfigSetAdminResponse();
}
public final Q setConfigSetName(String configSetName) {
this.configSetName = configSetName;
return getThis();
}
public final String getConfigSetName() {
return configSetName;
}
// CREATE request
public static class Create extends ConfigSetAdminRequest<Create> {
protected static String PROPERTY_PREFIX = "configSetProp";
protected String baseConfigSetName;
protected Properties properties;
public Create() {
action = ConfigSetAction.CREATE;
}
@Override
protected Create getThis() {
return this;
}
public final Create setBaseConfigSetName(String baseConfigSetName) {
this.baseConfigSetName = baseConfigSetName;
return getThis();
}
public final String getBaseConfigSetName() {
return baseConfigSetName;
}
public final Create setNewConfigSetProperties(Properties properties) {
this.properties = properties;
return getThis();
}
public final Properties getNewConfigSetProperties() {
return properties;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
if (baseConfigSetName == null) {
throw new RuntimeException( "no Base ConfigSet specified!" );
}
params.set("baseConfigSet", baseConfigSetName);
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
params.set(PROPERTY_PREFIX + "." + entry.getKey().toString(),
entry.getValue().toString());
}
}
return params;
}
}
// DELETE request
public static class Delete extends ConfigSetAdminRequest<Delete> {
public Delete() {
action = ConfigSetAction.DELETE;
}
@Override
protected Delete getThis() {
return this;
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.response;
import org.apache.solr.common.util.NamedList;
/**
* No special handling at this time.
*/
public class ConfigSetAdminResponse extends SolrResponseBase
{
@SuppressWarnings("unchecked")
public NamedList<String> getErrorMessages()
{
return (NamedList<String>) getResponse().get( "exceptions" );
}
}

View File

@ -29,6 +29,7 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* Class that manages named configs in Zookeeper
@ -142,4 +143,81 @@ public class ZkConfigManager {
throw new IOException("Error listing configs", SolrZkClient.checkInterrupted(e));
}
}
/**
* Check whether a config exists in Zookeeper
*
* @param configName the config to check existance on
* @return whether the config exists or not
* @throws IOException if an I/O error occurs
*/
public Boolean configExists(String configName) throws IOException {
try {
return zkClient.exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error checking whether config exists",
SolrZkClient.checkInterrupted(e));
}
}
/**
* Delete a config in ZooKeeper
*
* @param configName the config to delete
* @throws IOException if an I/O error occurs
*/
public void deleteConfigDir(String configName) throws IOException {
try {
zkClient.clean(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error checking whether config exists",
SolrZkClient.checkInterrupted(e));
}
}
private void copyConfigDirFromZk(String fromZkPath, String toZkPath, Set<String> copiedToZkPaths) throws IOException {
try {
List<String> files = zkClient.getChildren(fromZkPath, null, true);
for (String file : files) {
List<String> children = zkClient.getChildren(fromZkPath + "/" + file, null, true);
if (children.size() == 0) {
final String toZkFilePath = toZkPath + "/" + file;
logger.info("Copying zk node {} to {}",
fromZkPath + "/" + file, toZkFilePath);
byte[] data = zkClient.getData(fromZkPath + "/" + file, null, null, true);
zkClient.makePath(toZkFilePath, data, true);
if (copiedToZkPaths != null) copiedToZkPaths.add(toZkFilePath);
} else {
copyConfigDirFromZk(fromZkPath + "/" + file, toZkPath + "/" + file, copiedToZkPaths);
}
}
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error copying nodes from zookeeper path " + fromZkPath + " to " + toZkPath,
SolrZkClient.checkInterrupted(e));
}
}
/**
* Copy a config in ZooKeeper
*
* @param fromConfig the config to copy from
* @param toConfig the config to copy to
* @throws IOException if an I/O error occurs
*/
public void copyConfigDir(String fromConfig, String toConfig) throws IOException {
copyConfigDir(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, null);
}
/**
* Copy a config in ZooKeeper
*
* @param fromConfig the config to copy from
* @param toConfig the config to copy to
* @param copiedToZkPaths should be an empty Set, will be filled in by function
with the paths that were actually copied to.
* @throws IOException if an I/O error occurs
*/
public void copyConfigDir(String fromConfig, String toConfig, Set<String> copiedToZkPaths) throws IOException {
copyConfigDirFromZk(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, copiedToZkPaths);
}
}

View File

@ -0,0 +1,51 @@
package org.apache.solr.common.params;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Locale;
/**
* ConfigSets API related parameters and actions.
*/
public interface ConfigSetParams
{
public final static String ACTION = "action";
public enum ConfigSetAction {
CREATE,
DELETE;
public static ConfigSetAction get(String p) {
if (p != null) {
try {
return ConfigSetAction.valueOf( p.toUpperCase(Locale.ROOT) );
} catch (Exception ex) {}
}
return null;
}
public boolean isEqual(String s) {
if (s == null) return false;
return toString().equals(s.toUpperCase(Locale.ROOT));
}
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.Assert;
import org.junit.Test;
/**
* Basic error checking of ConfigSetAdminRequests.
*/
public class TestConfigSetAdminRequest extends SolrTestCaseJ4 {
@Test
public void testNoAction() {
ConfigSetAdminRequest request = new MyConfigSetAdminRequest();
request.setConfigSetName("name");
verifyException(request, "action");
}
@Test
public void testCreate() {
ConfigSetAdminRequest.Create create = new ConfigSetAdminRequest.Create();
verifyException(create, "ConfigSet");
create.setConfigSetName("name");
verifyException(create, "Base ConfigSet");
create.setBaseConfigSetName("baseConfigSet");
create.getParams();
}
@Test
public void testDelete() {
ConfigSetAdminRequest.Delete delete = new ConfigSetAdminRequest.Delete();
verifyException(delete, "ConfigSet");
}
private void verifyException(ConfigSetAdminRequest request, String errorContains) {
try {
request.getParams();
Assert.fail("Expected exception");
} catch (Exception e) {
assertTrue("Expected exception message to contain: " + errorContains,
e.getMessage().contains(errorContains));
}
}
private static class MyConfigSetAdminRequest extends ConfigSetAdminRequest<MyConfigSetAdminRequest> {
public MyConfigSetAdminRequest() {}
@Override
public MyConfigSetAdminRequest getThis() {
return this;
}
};
}

View File

@ -64,6 +64,7 @@ public class MiniSolrCloudCluster {
private static Logger log = LoggerFactory.getLogger(MiniSolrCloudCluster.class);
private final ZkTestServer zkServer;
private final boolean externalZkServer;
private final List<JettySolrRunner> jettys = new LinkedList<>();
private final File testDir;
private final CloudSolrClient solrClient;
@ -125,15 +126,34 @@ public class MiniSolrCloudCluster {
* @throws Exception if there was an error starting the cluster
*/
public MiniSolrCloudCluster(int numServers, File baseDir, File solrXml, JettyConfig jettyConfig) throws Exception {
this(numServers, baseDir, solrXml, jettyConfig, null);
}
/**
* Create a MiniSolrCloudCluster
*
* @param numServers number of Solr servers to start
* @param baseDir base directory that the mini cluster should be run from
* @param solrXml solr.xml file to be uploaded to ZooKeeper
* @param jettyConfig Jetty configuration
* @param zkTestServer ZkTestServer to use. If null, one will be created
*
* @throws Exception if there was an error starting the cluster
*/
public MiniSolrCloudCluster(int numServers, File baseDir, File solrXml, JettyConfig jettyConfig, ZkTestServer zkTestServer) throws Exception {
this.testDir = baseDir;
this.jettyConfig = jettyConfig;
String zkDir = testDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
zkServer = new ZkTestServer(zkDir);
zkServer.run();
this.externalZkServer = zkTestServer != null;
if (!externalZkServer) {
String zkDir = testDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
zkTestServer = new ZkTestServer(zkDir);
zkTestServer.run();
}
this.zkServer = zkTestServer;
try(SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(),
AbstractZkTestCase.TIMEOUT, 45000, null)) {
zkClient.makePath("/solr/solr.xml", solrXml, false, true);
@ -375,7 +395,9 @@ public class MiniSolrCloudCluster {
executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);
try {
zkServer.shutdown();
if (!externalZkServer) {
zkServer.shutdown();
}
} finally {
System.clearProperty("zkHost");
}

View File

@ -448,6 +448,14 @@ public class ZkTestServer {
});
}
public ZKDatabase getZKDatabase() {
return zkServer.zooKeeperServer.getZKDatabase();
}
public void setZKDatabase(ZKDatabase zkDb) {
zkServer.zooKeeperServer.setZKDatabase(zkDb);
}
public void run() throws InterruptedException {
log.info("STARTING ZK TEST SERVER");
// we don't call super.distribSetUp