SOLR-4043: Add ability to get success/failure responses from Collections API.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1438550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-01-25 15:14:51 +00:00
parent 87efa12ae8
commit 7d70f6190a
7 changed files with 330 additions and 87 deletions

View File

@ -63,6 +63,9 @@ Detailed Change List
New Features
----------------------
* SOLR-4043: Add ability to get success/failure responses from Collections API.
(Raintung Li, Mark Miller)
Bug Fixes
----------------------

View File

@ -48,6 +48,8 @@ public class DistributedQueue {
private final String prefix = "qn-";
private final String response_prefix = "qnr-" ;
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this.dir = dir;
@ -100,7 +102,7 @@ public class DistributedQueue {
*
* @return the data at the head of the queue.
*/
public byte[] element() throws NoSuchElementException, KeeperException,
private QueueEvent element() throws NoSuchElementException, KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
@ -122,7 +124,7 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
try {
return zookeeper.getData(dir + "/" + headNode, null, null, true);
return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
@ -162,17 +164,41 @@ public class DistributedQueue {
}
}
/**
* Remove the event and save the response into the other path.
*
*/
public byte[] remove(QueueEvent event) throws KeeperException,
InterruptedException {
String path = event.getId();
String responsePath = dir + "/" + response_prefix
+ path.substring(path.lastIndexOf("-") + 1);
if (zookeeper.exists(responsePath, true)) {
zookeeper.setData(responsePath, event.getBytes(), true);
}
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
}
private class LatchChildWatcher implements Watcher {
Object lock = new Object();
private WatchedEvent event = null;
public LatchChildWatcher() {}
public LatchChildWatcher(Object lock) {
this.lock = lock;
}
@Override
public void process(WatchedEvent event) {
LOG.info("Watcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
this.event = event;
lock.notifyAll();
}
}
@ -182,6 +208,10 @@ public class DistributedQueue {
lock.wait(timeout);
}
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
/**
@ -225,22 +255,51 @@ public class DistributedQueue {
*/
public boolean offer(byte[] data) throws KeeperException,
InterruptedException {
return createData(dir + "/" + prefix, data,
CreateMode.PERSISTENT_SEQUENTIAL) != null;
}
/**
* Inserts data into zookeeper.
*
* @return true if data was successfully added
*/
private String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
for (;;) {
try {
zookeeper.create(dir + "/" + prefix, data, acl,
CreateMode.PERSISTENT_SEQUENTIAL, true);
return true;
return zookeeper.create(path, data, acl, mode, true);
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
//someone created it
// someone created it
}
}
}
}
/**
* Offer the data and wait for the response
*
*/
public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException {
String path = createData(dir + "/" + prefix, data,
CreateMode.PERSISTENT_SEQUENTIAL);
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
Object lock = new Object();
LatchChildWatcher watcher = new LatchChildWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
}
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
}
/**
@ -251,21 +310,74 @@ public class DistributedQueue {
*/
public byte[] peek() throws KeeperException, InterruptedException {
try {
return element();
return element().getBytes();
} catch (NoSuchElementException e) {
return null;
}
}
public static class QueueEvent {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
QueueEvent other = (QueueEvent) obj;
if (id == null) {
if (other.id != null) return false;
} else if (!id.equals(other.id)) return false;
return true;
}
private WatchedEvent event = null;
private String id;
private byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
this.id = id;
this.bytes = bytes;
this.event = event;
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public byte[] getBytes() {
return bytes;
}
public WatchedEvent getWatchedEvent() {
return event;
}
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty.
*
* @return data at the first element of the queue, or null.
*/
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
if (!block) {
return peek();
return element();
}
TreeMap<Long,String> orderedChildren;
@ -286,7 +398,7 @@ public class DistributedQueue {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
return data;
return new QueueEvent(path, data, childWatcher.getWatchedEvent());
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}

View File

@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
@ -36,6 +38,7 @@ import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
@ -94,47 +97,33 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
@Override
public void run() {
log.info("Process current queue of collection messages");
while (amILeader() && !isClosed) {
try {
byte[] head = workQueue.peek(true);
//if (head != null) { // should not happen since we block above
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION);
try {
boolean success = processMessage(message, operation);
if (!success) {
// TODO: what to do on failure / partial failure
// if we fail, do we clean up then ?
SolrException.log(log,
"Collection " + operation + " of " + message.getStr("name")
+ " failed");
}
} catch(Throwable t) {
SolrException.log(log,
"Collection " + operation + " of " + message.getStr("name")
+ " failed", t);
}
//}
workQueue.poll();
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("Overseer cannot talk to ZK");
return;
}
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
log.info("Process current queue of collection creations");
while (amILeader() && !isClosed) {
try {
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation);
head.setBytes(SolrResponse.serializable(response));
workQueue.remove(head);
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("Overseer cannot talk to ZK");
return;
}
SolrException.log(log, "", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Throwable e) {
SolrException.log(log, "", e);
}
}
}
public void close() {
@ -157,21 +146,49 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
return false;
}
protected boolean processMessage(ZkNodeProps message, String operation) {
if (CREATECOLLECTION.equals(operation)) {
return createCollection(zkStateReader.getClusterState(), message);
} else if (DELETECOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
return collectionCmd(zkStateReader.getClusterState(), message, params);
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
return collectionCmd(zkStateReader.getClusterState(), message, params);
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
NamedList results = new NamedList();
try {
if (CREATECOLLECTION.equals(operation)) {
createCollection(zkStateReader.getClusterState(), message);
} else if (DELETECOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
collectionCmd(zkStateReader.getClusterState(), message, params);
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
collectionCmd(zkStateReader.getClusterState(), message, params);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknow the operation:"
+ operation);
}
int failed = 0;
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedIncludingErrors();
if (srsp != null) {
Throwable e = srsp.getException();
if (e != null) {
failed++;
log.error("Error talking to shard: " + srsp.getShard(), e);
results.add(srsp.getShard(), e);
} else {
results.add(srsp.getShard(), srsp.getSolrResponse().getResponse());
}
}
} while (srsp != null);
} catch (SolrException ex) {
SolrException.log(log, "Collection " + operation + " of " + operation
+ " failed");
results.add("Operation " + operation + " cause exception:", ex);
} finally {
return new OverseerSolrResponse(results);
}
// unknown command, toss it from our queue
return true;
}
private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {

View File

@ -0,0 +1,47 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.util.NamedList;
public class OverseerSolrResponse extends SolrResponse {
NamedList responseList = null;
public OverseerSolrResponse(NamedList list) {
responseList = list;
}
@Override
public long getElapsedTime() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setResponse(NamedList<Object> rsp) {
this.responseList = rsp;
}
@Override
public NamedList<Object> getResponse() {
return responseList;
}
}

View File

@ -21,10 +21,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
@ -127,7 +129,35 @@ public class CollectionsHandler extends RequestHandlerBase {
rsp.setHttpCaching(false);
}
public static long DEFAULT_ZK_TIMEOUT = 60*1000;
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
long time = System.currentTimeMillis();
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m), DEFAULT_ZK_TIMEOUT);
if (event.getBytes() != null) {
SolrResponse response = SolrResponse.deserialize(event.getBytes());
rsp.getValues().addAll(response.getResponse());
} else {
if (System.currentTimeMillis() - time >= DEFAULT_ZK_TIMEOUT) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection time out:" + DEFAULT_ZK_TIMEOUT / 1000 + "s");
} else if (event.getWatchedEvent() != null) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection error [Watcher fired on path: "
+ event.getWatchedEvent().getPath() + " state: "
+ event.getWatchedEvent().getState() + " type "
+ event.getWatchedEvent().getType() + "]");
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection unkown case");
}
}
}
private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Reloading Collection : " + req.getParamString());
String name = req.getParams().required().get("name");
@ -135,8 +165,7 @@ public class CollectionsHandler extends RequestHandlerBase {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.RELOADCOLLECTION, "name", name);
// TODO: what if you want to block until the collection is available?
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp);
}
private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
@ -168,8 +197,7 @@ public class CollectionsHandler extends RequestHandlerBase {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
// TODO: what if you want to block until the collection is available?
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
handleResponse(OverseerCollectionProcessor.DELETECOLLECTION, m, rsp);
}
@ -208,8 +236,7 @@ public class CollectionsHandler extends RequestHandlerBase {
ZkNodeProps m = new ZkNodeProps(props);
// TODO: what if you want to block until the collection is available?
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
}
public static ModifiableSolrParams params(String... params) {

View File

@ -19,10 +19,11 @@ package org.apache.solr.cloud;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import java.util.ArrayList;
@ -36,6 +37,8 @@ import java.util.Queue;
import java.util.Set;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -43,11 +46,13 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.junit.After;
@ -71,12 +76,12 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
private OverseerCollectionProcessorToBeTested underTest;
private Thread thread;
private Queue<byte[]> queue = new BlockingArrayQueue<byte[]>();
private Queue<QueueEvent> queue = new BlockingArrayQueue<QueueEvent>();
private class OverseerCollectionProcessorToBeTested extends
OverseerCollectionProcessor {
private boolean lastProcessMessageResult = true;
private SolrResponse lastProcessMessageResult;
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandler shardHandler, String adminPath,
@ -85,7 +90,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
}
@Override
protected boolean processMessage(ZkNodeProps message, String operation) {
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
lastProcessMessageResult = super.processMessage(message, operation);
return lastProcessMessageResult;
}
@ -147,11 +152,12 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
}
}).anyTimes();
workQueueMock.remove();
workQueueMock.remove(anyObject(QueueEvent.class));
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
return queue.poll();
queue.remove((QueueEvent)EasyMock.getCurrentArguments()[0]);
return null;
}
}).anyTimes();
@ -273,7 +279,8 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
maxShardsPerNode.toString());
}
queue.add(ZkStateReader.toJSON(props));
QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null);
queue.add(qe);
}
protected void verifySubmitCaptures(List<SubmitCapture> submitCaptures,
@ -443,7 +450,9 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
waitForEmptyQueue(10000);
assertEquals(collectionExceptedToBeCreated, underTest.lastProcessMessageResult);
if (collectionExceptedToBeCreated) {
assertNotNull(underTest.lastProcessMessageResult.getResponse().toString(), underTest.lastProcessMessageResult);
}
verify(shardHandlerMock);
if (collectionExceptedToBeCreated) {

View File

@ -17,19 +17,47 @@
package org.apache.solr.client.solrj;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
/**
*
*
*
* @since solr 1.3
*/
public abstract class SolrResponse implements Serializable
{
public abstract class SolrResponse implements Serializable {
public abstract long getElapsedTime();
public abstract void setResponse( NamedList<Object> rsp );
public abstract void setResponse(NamedList<Object> rsp);
public abstract NamedList<Object> getResponse();
public static byte[] serializable(SolrResponse response) {
try {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteStream);
outputStream.writeObject(response);
return byteStream.toByteArray();
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
public static SolrResponse deserialize(byte[] bytes) {
try {
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
ObjectInputStream inputStream = new ObjectInputStream(byteStream);
return (SolrResponse) inputStream.readObject();
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
}