clean logging file

This commit is contained in:
kimchy 2010-03-31 21:18:40 +03:00
parent 38d8fad8d0
commit 632f0d4f0d
2 changed files with 196 additions and 6 deletions

View File

@ -1,7 +1,6 @@
rootLogger: INFO, console, file
logger:
jgroups: WARN
action: DEBUG
appender:
console:

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.support.broadcast;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.BaseAction;
@ -29,17 +30,27 @@ import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.ThrowableObjectInputStream;
import org.elasticsearch.util.io.ThrowableObjectOutputStream;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class TransportBroadcastOperationAction<Request extends BroadcastOperationRequest, Response extends BroadcastOperationResponse, ShardRequest extends BroadcastShardOperationRequest, ShardResponse extends BroadcastShardOperationResponse>
extends BaseAction<Request, Response> {
@ -89,7 +100,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
return true;
}
private class AsyncBroadcastAction {
class AsyncBroadcastAction {
private final Request request;
@ -109,7 +120,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
private final AtomicReferenceArray shardsResponses;
private AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
@ -282,7 +293,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
private class TransportHandler extends BaseTransportRequestHandler<Request> {
class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override public Request newInstance() {
return newRequest();
@ -319,7 +330,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
@Override public ShardRequest newInstance() {
return newShardRequest();
@ -329,4 +340,184 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
channel.sendResponse(shardOperation(request));
}
}
// FROM HERE: When we move to a single remote call with all shard requests to the same node, then
// the below classes can help
class ShardsTransportHandler extends BaseTransportRequestHandler<ShardsRequest> {
@Override public ShardsRequest newInstance() {
return new ShardsRequest();
}
@Override public void messageReceived(final ShardsRequest request, final TransportChannel channel) throws Exception {
if (request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD) {
final AtomicInteger counter = new AtomicInteger(request.requests().size());
final AtomicInteger index = new AtomicInteger();
final AtomicReferenceArray results = new AtomicReferenceArray(request.requests().size());
for (final ShardRequest singleRequest : request.requests()) {
threadPool.execute(new Runnable() {
@Override public void run() {
int arrIndex = index.getAndIncrement();
try {
results.set(arrIndex, shardOperation(singleRequest));
} catch (Exception e) {
results.set(arrIndex, new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e));
}
if (counter.decrementAndGet() == 0) {
// we are done
List<ShardResponse> responses = newArrayListWithCapacity(request.requests().size());
List<BroadcastShardOperationFailedException> exceptions = null;
for (int i = 0; i < results.length(); i++) {
Object result = results.get(i);
if (result instanceof BroadcastShardOperationFailedException) {
if (exceptions == null) {
exceptions = newArrayList();
}
exceptions.add((BroadcastShardOperationFailedException) result);
} else {
responses.add((ShardResponse) result);
}
}
try {
channel.sendResponse(new ShardsResponse(responses, exceptions));
} catch (IOException e) {
logger.warn("Failed to send broadcast response", e);
}
}
}
});
}
} else {
// single thread
threadPool.execute(new Runnable() {
@Override public void run() {
List<ShardResponse> responses = newArrayListWithCapacity(request.requests().size());
List<BroadcastShardOperationFailedException> exceptions = null;
for (ShardRequest singleRequest : request.requests()) {
try {
responses.add(shardOperation(singleRequest));
} catch (Exception e) {
if (exceptions == null) {
exceptions = newArrayList();
}
exceptions.add(new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e));
}
}
try {
channel.sendResponse(new ShardsResponse(responses, exceptions));
} catch (IOException e) {
logger.warn("Failed to send broadcast response", e);
}
}
});
}
}
@Override public boolean spawn() {
// we handle the forking here...
return false;
}
}
class ShardsResponse implements Streamable {
private List<ShardResponse> responses;
private List<BroadcastShardOperationFailedException> exceptions;
ShardsResponse() {
}
ShardsResponse(List<ShardResponse> responses, List<BroadcastShardOperationFailedException> exceptions) {
this.responses = responses;
this.exceptions = exceptions;
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
responses = newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
ShardResponse response = newShardResponse();
response.readFrom(in);
responses.add(response);
}
size = in.readVInt();
if (size > 0) {
exceptions = newArrayListWithCapacity(size);
ThrowableObjectInputStream toi = new ThrowableObjectInputStream(in);
for (int i = 0; i < size; i++) {
try {
exceptions.add((BroadcastShardOperationFailedException) toi.readObject());
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load class", e);
}
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(responses.size());
for (BroadcastShardOperationResponse response : responses) {
response.writeTo(out);
}
if (exceptions == null || exceptions.isEmpty()) {
out.writeInt(0);
} else {
out.writeInt(exceptions.size());
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(out);
for (BroadcastShardOperationFailedException ex : exceptions) {
too.writeObject(ex);
}
too.flush();
}
}
}
class ShardsRequest implements Streamable {
private BroadcastOperationThreading operationThreading = BroadcastOperationThreading.SINGLE_THREAD;
private List<ShardRequest> requests;
ShardsRequest() {
}
public List<ShardRequest> requests() {
return this.requests;
}
public BroadcastOperationThreading operationThreading() {
return operationThreading;
}
ShardsRequest(BroadcastOperationThreading operationThreading, List<ShardRequest> requests) {
this.operationThreading = operationThreading;
this.requests = requests;
}
@Override public void readFrom(StreamInput in) throws IOException {
operationThreading = BroadcastOperationThreading.fromId(in.readByte());
int size = in.readVInt();
if (size == 0) {
requests = ImmutableList.of();
} else {
requests = newArrayListWithCapacity(in.readVInt());
for (int i = 0; i < size; i++) {
ShardRequest request = newShardRequest();
request.readFrom(in);
requests.add(request);
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(operationThreading.id());
out.writeVInt(requests.size());
for (BroadcastShardOperationRequest request : requests) {
request.writeTo(out);
}
}
}
}