fix master speed issue with immutable copy of in druid server, better exception handling in moving and dropping segments

This commit is contained in:
fjy 2013-07-16 13:24:43 -07:00
parent 09b1130810
commit 49a80572b7
2 changed files with 87 additions and 122 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -116,7 +117,8 @@ public class DruidServer implements Comparable
@JsonProperty @JsonProperty
public Map<String, DataSegment> getSegments() public Map<String, DataSegment> getSegments()
{ {
return ImmutableMap.copyOf(segments); // Copying the map slows things down a lot here, don't use Immutable Map here
return Collections.unmodifiableMap(segments);
} }
public DataSegment getSegment(String segmentName) public DataSegment getSegment(String segmentName)

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -247,144 +248,106 @@ public class DruidMaster
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{ {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from); try {
if (fromServer == null) { final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); if (fromServer == null) {
} throw new IAE("Unable to find server [%s]", from);
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to); final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) { if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to)); throw new IAE("Unable to find server [%s]", to);
} }
if (to.equalsIgnoreCase(from)) { if (to.equalsIgnoreCase(from)) {
throw new IllegalArgumentException( throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to) }
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IAE(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
);
}
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
); );
}
final DataSegment segment = fromServer.getSegment(segmentName); loadPeon.loadSegment(
if (segment == null) { segment,
throw new IllegalArgumentException( new LoadPeonCallback()
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
)
);
}
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
);
loadPeon.loadSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
{ {
try { @Override
if (curator.checkExists().forPath(toServedSegPath) != null && protected void execute()
curator.checkExists().forPath(toLoadQueueSegPath) == null && {
!dropPeon.getSegmentsToDrop().contains(segment)) { try {
dropPeon.dropSegment(segment, callback); if (curator.checkExists().forPath(toServedSegPath) != null &&
} else if (callback != null) { curator.checkExists().forPath(toLoadQueueSegPath) == null &&
callback.execute(); !dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
} else if (callback != null) {
callback.execute();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
} }
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
}
);
}
public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback)
{
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IllegalArgumentException(
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
); );
} }
catch (Exception e) {
final LoadQueuePeon loadPeon = loadManagementPeons.get(to); log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
if (loadPeon == null) { callback.execute();
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
)
);
}
if (!loadPeon.getSegmentsToLoad().contains(segment)) {
loadPeon.loadSegment(segment, callback);
} }
} }
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback) public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
{ {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from); try {
if (fromServer == null) { final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); if (fromServer == null) {
} throw new IAE("Unable to find server [%s]", from);
}
final DataSegment segment = fromServer.getSegment(segmentName); final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) { if (segment == null) {
throw new IllegalArgumentException( throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
String.format("Unable to find segment [%s] on server [%s]", segmentName, from) }
);
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from); final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) { if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from)); throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
} }
if (!dropPeon.getSegmentsToDrop().contains(segment)) { if (!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback); dropPeon.dropSegment(segment, callback);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
callback.execute();
} }
} }
@ -543,7 +506,7 @@ public class DruidMaster
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Unable to become master") log.makeAlert(e, "Unable to become master")
.emit(); .emit();
final LeaderLatch oldLatch = createNewLeaderLatch(); final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch); Closeables.closeQuietly(oldLatch);
try { try {