diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index 8e3c2509697..e2577795a27 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -87,7 +87,7 @@ public class LoadQueuePeon private final Object lock = new Object(); - private volatile SegmentHolder currentlyLoading = null; + private volatile SegmentHolder currentlyProcessing = null; LoadQueuePeon( CuratorFramework curator, @@ -156,10 +156,10 @@ public class LoadQueuePeon ) { synchronized (lock) { - if ((currentlyLoading != null) && - currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) { + if ((currentlyProcessing != null) && + currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { if (callback != null) { - currentlyLoading.addCallback(callback); + currentlyProcessing.addCallback(callback); } return; } @@ -170,13 +170,13 @@ public class LoadQueuePeon synchronized (lock) { if (segmentsToLoad.contains(holder)) { if ((callback != null)) { - currentlyLoading.addCallback(callback); + currentlyProcessing.addCallback(callback); } return; } } - log.info("Asking server peon[%s] to load segment[%s]", basePath, segment); + log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); segmentsToLoad.add(holder); doNext(); @@ -188,10 +188,10 @@ public class LoadQueuePeon ) { synchronized (lock) { - if ((currentlyLoading != null) && - currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) { + if ((currentlyProcessing != null) && + currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { if (callback != null) { - currentlyLoading.addCallback(callback); + currentlyProcessing.addCallback(callback); } return; } @@ -202,7 +202,7 @@ public class LoadQueuePeon synchronized (lock) { if (segmentsToDrop.contains(holder)) { if (callback != null) { - currentlyLoading.addCallback(callback); + currentlyProcessing.addCallback(callback); } return; } @@ -216,13 +216,13 @@ public class LoadQueuePeon private void doNext() { synchronized (lock) { - if (currentlyLoading == null) { + if (currentlyProcessing == null) { if (!segmentsToDrop.isEmpty()) { - currentlyLoading = segmentsToDrop.first(); - log.info("Server[%s] dropping [%s]", basePath, currentlyLoading); + currentlyProcessing = segmentsToDrop.first(); + log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else if (!segmentsToLoad.isEmpty()) { - currentlyLoading = segmentsToLoad.first(); - log.info("Server[%s] loading [%s]", basePath, currentlyLoading); + currentlyProcessing = segmentsToLoad.first(); + log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else { return; } @@ -235,16 +235,16 @@ public class LoadQueuePeon { synchronized (lock) { try { - if (currentlyLoading == null) { + if (currentlyProcessing == null) { log.makeAlert("Crazy race condition! server[%s]", basePath) .emit(); actionCompleted(); doNext(); return; } - log.info("Server[%s] adding segment[%s]", basePath, currentlyLoading.getSegmentIdentifier()); - final String path = ZKPaths.makePath(basePath, currentlyLoading.getSegmentIdentifier()); - final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); + log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); + final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); zkWritingExecutor.schedule( @@ -255,7 +255,7 @@ public class LoadQueuePeon { try { if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this assign!", path)); + failAssign(new ISE("%s was never removed! Failing this operation!", path)); } } catch (Exception e) { @@ -311,7 +311,9 @@ public class LoadQueuePeon ); } else { log.info( - "Server[%s] skipping doNext() because something is currently loading[%s].", basePath, currentlyLoading + "Server[%s] skipping doNext() because something is currently loading[%s].", + basePath, + currentlyProcessing.getSegmentIdentifier() ); } } @@ -319,29 +321,29 @@ public class LoadQueuePeon private void actionCompleted() { - if (currentlyLoading != null) { - switch (currentlyLoading.getType()) { + if (currentlyProcessing != null) { + switch (currentlyProcessing.getType()) { case LOAD: - segmentsToLoad.remove(currentlyLoading); - queuedSize.addAndGet(-currentlyLoading.getSegmentSize()); + segmentsToLoad.remove(currentlyProcessing); + queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); break; case DROP: - segmentsToDrop.remove(currentlyLoading); + segmentsToDrop.remove(currentlyProcessing); break; default: throw new UnsupportedOperationException(); } - currentlyLoading.executeCallbacks(); - currentlyLoading = null; + currentlyProcessing.executeCallbacks(); + currentlyProcessing = null; } } public void stop() { synchronized (lock) { - if (currentlyLoading != null) { - currentlyLoading.executeCallbacks(); - currentlyLoading = null; + if (currentlyProcessing != null) { + currentlyProcessing.executeCallbacks(); + currentlyProcessing = null; } if (!segmentsToDrop.isEmpty()) { @@ -366,14 +368,14 @@ public class LoadQueuePeon private void entryRemoved(String path) { synchronized (lock) { - if (currentlyLoading == null) { + if (currentlyProcessing == null) { log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path); return; } - if (!ZKPaths.getNodeFromPath(path).equals(currentlyLoading.getSegmentIdentifier())) { + if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) { log.warn( "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", - basePath, path, currentlyLoading + basePath, path, currentlyProcessing ); return; } @@ -387,7 +389,7 @@ public class LoadQueuePeon private void failAssign(Exception e) { synchronized (lock) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing); failedAssignCount.getAndIncrement(); // Act like it was completed so that the coordinator gives it to someone else actionCompleted();