diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java index ebd078e9383..e86efdad816 100644 --- a/client/src/main/java/com/metamx/druid/client/DruidServer.java +++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import com.metamx.druid.coordination.DruidServerMetadata; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -116,7 +117,8 @@ public class DruidServer implements Comparable @JsonProperty public Map getSegments() { - return ImmutableMap.copyOf(segments); + // Copying the map slows things down a lot here, don't use Immutable Map here + return Collections.unmodifiableMap(segments); } public DataSegment getSegment(String segmentName) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index c7c39d3fdd3..e0f7b5841ae 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Closeables; +import com.metamx.common.IAE; import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; @@ -247,144 +248,106 @@ public class DruidMaster public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) { - final DruidServer fromServer = serverInventoryView.getInventoryValue(from); - if (fromServer == null) { - throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); - } + try { + final DruidServer fromServer = serverInventoryView.getInventoryValue(from); + if (fromServer == null) { + throw new IAE("Unable to find server [%s]", from); + } - final DruidServer toServer = serverInventoryView.getInventoryValue(to); - if (toServer == null) { - throw new IllegalArgumentException(String.format("Unable to find server [%s]", to)); - } + final DruidServer toServer = serverInventoryView.getInventoryValue(to); + if (toServer == null) { + throw new IAE("Unable to find server [%s]", to); + } - if (to.equalsIgnoreCase(from)) { - throw new IllegalArgumentException( - String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to) + if (to.equalsIgnoreCase(from)) { + throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to); + } + + final DataSegment segment = fromServer.getSegment(segmentName); + if (segment == null) { + throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from); + } + + final LoadQueuePeon loadPeon = loadManagementPeons.get(to); + if (loadPeon == null) { + throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to); + } + + final LoadQueuePeon dropPeon = loadManagementPeons.get(from); + if (dropPeon == null) { + throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from); + } + + final ServerHolder toHolder = new ServerHolder(toServer, loadPeon); + if (toHolder.getAvailableSize() < segment.getSize()) { + throw new IAE( + "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.", + to, + segment, + segment.getSize(), + toHolder.getAvailableSize() + ); + } + + final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); + final String toServedSegPath = ZKPaths.makePath( + ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName ); - } - final DataSegment segment = fromServer.getSegment(segmentName); - if (segment == null) { - throw new IllegalArgumentException( - String.format("Unable to find segment [%s] on server [%s]", segmentName, from) - ); - } - - final LoadQueuePeon loadPeon = loadManagementPeons.get(to); - if (loadPeon == null) { - throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to)); - } - - final LoadQueuePeon dropPeon = loadManagementPeons.get(from); - if (dropPeon == null) { - throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from)); - } - - final ServerHolder toHolder = new ServerHolder(toServer, loadPeon); - if (toHolder.getAvailableSize() < segment.getSize()) { - throw new IllegalArgumentException( - String.format( - "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.", - to, - segment, - segment.getSize(), - toHolder.getAvailableSize() - ) - ); - } - - final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); - final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName - ); - - loadPeon.loadSegment( - segment, - new LoadPeonCallback() - { - @Override - protected void execute() + loadPeon.loadSegment( + segment, + new LoadPeonCallback() { - try { - if (curator.checkExists().forPath(toServedSegPath) != null && - curator.checkExists().forPath(toLoadQueueSegPath) == null && - !dropPeon.getSegmentsToDrop().contains(segment)) { - dropPeon.dropSegment(segment, callback); - } else if (callback != null) { - callback.execute(); + @Override + protected void execute() + { + try { + if (curator.checkExists().forPath(toServedSegPath) != null && + curator.checkExists().forPath(toLoadQueueSegPath) == null && + !dropPeon.getSegmentsToDrop().contains(segment)) { + dropPeon.dropSegment(segment, callback); + } else if (callback != null) { + callback.execute(); + } + } + catch (Exception e) { + throw Throwables.propagate(e); } } - catch (Exception e) { - throw Throwables.propagate(e); - } } - } - ); - } - - public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback) - { - final DruidServer fromServer = serverInventoryView.getInventoryValue(from); - if (fromServer == null) { - throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); - } - - final DruidServer toServer = serverInventoryView.getInventoryValue(to); - if (toServer == null) { - throw new IllegalArgumentException(String.format("Unable to find server [%s]", to)); - } - - final DataSegment segment = fromServer.getSegment(segmentName); - if (segment == null) { - throw new IllegalArgumentException( - String.format("Unable to find segment [%s] on server [%s]", segmentName, from) ); } - - final LoadQueuePeon loadPeon = loadManagementPeons.get(to); - if (loadPeon == null) { - throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to)); - } - - final ServerHolder toHolder = new ServerHolder(toServer, loadPeon); - if (toHolder.getAvailableSize() < segment.getSize()) { - throw new IllegalArgumentException( - String.format( - "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.", - to, - segment, - segment.getSize(), - toHolder.getAvailableSize() - ) - ); - } - - if (!loadPeon.getSegmentsToLoad().contains(segment)) { - loadPeon.loadSegment(segment, callback); + catch (Exception e) { + log.makeAlert(e, "Exception moving segment %s", segmentName).emit(); + callback.execute(); } } public void dropSegment(String from, String segmentName, final LoadPeonCallback callback) { - final DruidServer fromServer = serverInventoryView.getInventoryValue(from); - if (fromServer == null) { - throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); - } + try { + final DruidServer fromServer = serverInventoryView.getInventoryValue(from); + if (fromServer == null) { + throw new IAE("Unable to find server [%s]", from); + } - final DataSegment segment = fromServer.getSegment(segmentName); - if (segment == null) { - throw new IllegalArgumentException( - String.format("Unable to find segment [%s] on server [%s]", segmentName, from) - ); - } + final DataSegment segment = fromServer.getSegment(segmentName); + if (segment == null) { + throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from); + } - final LoadQueuePeon dropPeon = loadManagementPeons.get(from); - if (dropPeon == null) { - throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from)); - } + final LoadQueuePeon dropPeon = loadManagementPeons.get(from); + if (dropPeon == null) { + throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from); + } - if (!dropPeon.getSegmentsToDrop().contains(segment)) { - dropPeon.dropSegment(segment, callback); + if (!dropPeon.getSegmentsToDrop().contains(segment)) { + dropPeon.dropSegment(segment, callback); + } + } + catch (Exception e) { + log.makeAlert(e, "Exception dropping segment %s", segmentName).emit(); + callback.execute(); } } @@ -543,7 +506,7 @@ public class DruidMaster } catch (Exception e) { log.makeAlert(e, "Unable to become master") - .emit(); + .emit(); final LeaderLatch oldLatch = createNewLeaderLatch(); Closeables.closeQuietly(oldLatch); try {