less verbose logging for LQP

This commit is contained in:
fjy 2014-07-29 09:51:53 -07:00
parent 7aef463457
commit 382cde9cf2
1 changed files with 37 additions and 35 deletions

View File

@ -87,7 +87,7 @@ public class LoadQueuePeon
private final Object lock = new Object();
private volatile SegmentHolder currentlyLoading = null;
private volatile SegmentHolder currentlyProcessing = null;
LoadQueuePeon(
CuratorFramework curator,
@ -156,10 +156,10 @@ public class LoadQueuePeon
)
{
synchronized (lock) {
if ((currentlyLoading != null) &&
currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyLoading.addCallback(callback);
currentlyProcessing.addCallback(callback);
}
return;
}
@ -170,13 +170,13 @@ public class LoadQueuePeon
synchronized (lock) {
if (segmentsToLoad.contains(holder)) {
if ((callback != null)) {
currentlyLoading.addCallback(callback);
currentlyProcessing.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment);
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.add(holder);
doNext();
@ -188,10 +188,10 @@ public class LoadQueuePeon
)
{
synchronized (lock) {
if ((currentlyLoading != null) &&
currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyLoading.addCallback(callback);
currentlyProcessing.addCallback(callback);
}
return;
}
@ -202,7 +202,7 @@ public class LoadQueuePeon
synchronized (lock) {
if (segmentsToDrop.contains(holder)) {
if (callback != null) {
currentlyLoading.addCallback(callback);
currentlyProcessing.addCallback(callback);
}
return;
}
@ -216,13 +216,13 @@ public class LoadQueuePeon
private void doNext()
{
synchronized (lock) {
if (currentlyLoading == null) {
if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyLoading = segmentsToDrop.first();
log.info("Server[%s] dropping [%s]", basePath, currentlyLoading);
currentlyProcessing = segmentsToDrop.first();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyLoading = segmentsToLoad.first();
log.info("Server[%s] loading [%s]", basePath, currentlyLoading);
currentlyProcessing = segmentsToLoad.first();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
return;
}
@ -235,16 +235,16 @@ public class LoadQueuePeon
{
synchronized (lock) {
try {
if (currentlyLoading == null) {
if (currentlyProcessing == null) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
actionCompleted();
doNext();
return;
}
log.info("Server[%s] adding segment[%s]", basePath, currentlyLoading.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyLoading.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest());
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
zkWritingExecutor.schedule(
@ -255,7 +255,7 @@ public class LoadQueuePeon
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this assign!", path));
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
@ -311,7 +311,9 @@ public class LoadQueuePeon
);
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].", basePath, currentlyLoading
"Server[%s] skipping doNext() because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
}
}
@ -319,29 +321,29 @@ public class LoadQueuePeon
private void actionCompleted()
{
if (currentlyLoading != null) {
switch (currentlyLoading.getType()) {
if (currentlyProcessing != null) {
switch (currentlyProcessing.getType()) {
case LOAD:
segmentsToLoad.remove(currentlyLoading);
queuedSize.addAndGet(-currentlyLoading.getSegmentSize());
segmentsToLoad.remove(currentlyProcessing);
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(currentlyLoading);
segmentsToDrop.remove(currentlyProcessing);
break;
default:
throw new UnsupportedOperationException();
}
currentlyLoading.executeCallbacks();
currentlyLoading = null;
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
}
}
public void stop()
{
synchronized (lock) {
if (currentlyLoading != null) {
currentlyLoading.executeCallbacks();
currentlyLoading = null;
if (currentlyProcessing != null) {
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
}
if (!segmentsToDrop.isEmpty()) {
@ -366,14 +368,14 @@ public class LoadQueuePeon
private void entryRemoved(String path)
{
synchronized (lock) {
if (currentlyLoading == null) {
if (currentlyProcessing == null) {
log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path);
return;
}
if (!ZKPaths.getNodeFromPath(path).equals(currentlyLoading.getSegmentIdentifier())) {
if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath, path, currentlyLoading
basePath, path, currentlyProcessing
);
return;
}
@ -387,7 +389,7 @@ public class LoadQueuePeon
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();