001package org.fusesource.hawtdispatch.jmx;
002
003import org.fusesource.hawtdispatch.Dispatcher;
004import org.fusesource.hawtdispatch.Metrics;
005import org.fusesource.hawtdispatch.internal.HawtDispatcher;
006
007import javax.management.*;
008import javax.management.openmbean.*;
009import java.lang.management.ManagementFactory;
010import java.util.*;
011import java.util.concurrent.TimeUnit;
012
013/**
014 *
015 */
016public class JmxService {
017
018    public static final String DISPATCHER_OBJECT_NAME = "org.hawtdispatch:type=Dispatcher";
019
020    static public interface JmxDispatcherMBean {
021        @MBeanInfo("Used to enable or disable profiling")
022        public void setTimeUnit(String unit);
023
024        @MBeanInfo("Is profiling enabled.")
025        public String getTimeUnit();
026
027
028        @MBeanInfo("Used to enable or disable profiling")
029        public void setProfile(boolean enabled);
030
031        @MBeanInfo("Is profiling enabled.")
032        public boolean getProfile();
033
034        @MBeanInfo("Get the collected profiling metrics.")
035        public CompositeData[] getMetrics() throws OpenDataException;
036    }
037
038    static public class JmxDispatcher implements JmxDispatcherMBean {
039
040        final Dispatcher dispatcher;
041        TimeUnit timeUnit = TimeUnit.MILLISECONDS;
042
043        public JmxDispatcher(Dispatcher dispatcher) {
044            this.dispatcher = dispatcher;
045        }
046
047        public String getTimeUnit() {
048            return timeUnit.name();
049        }
050
051        public void setTimeUnit(String unit) {
052            this.timeUnit = TimeUnit.valueOf(unit);
053        }
054
055        public boolean getProfile() {
056            return dispatcher.profile();
057        }
058
059        public void setProfile(boolean enabled) {
060            dispatcher.profile(enabled);
061        }
062
063        public CompositeData[] getMetrics() throws OpenDataException {
064            ArrayList<CompositeData> rc = new ArrayList<CompositeData>();
065
066            // lets sort by runtime.
067            ArrayList<Metrics> metrics = new ArrayList<Metrics>(dispatcher.metrics());
068            Collections.sort(metrics, new Comparator<Metrics>() {
069                public int compare(Metrics l, Metrics r) {
070                    if( l.totalRunTimeNS == r.totalRunTimeNS )
071                        return 0;
072                    return l.totalRunTimeNS < r.totalRunTimeNS ? 1 : -1;
073                }
074            });
075            for (Metrics metric : metrics) {
076                rc.add(convert(metric, timeUnit));
077            }
078            return rc.toArray(new CompositeData[rc.size()]);
079        }
080    }
081
082    static class CompositeTypeFactory {
083        private final List<String> itemNamesList = new ArrayList<String>();
084        private final List<String> itemDescriptionsList = new ArrayList<String>();
085        private final List<OpenType> itemTypesList = new ArrayList<OpenType>();
086
087        protected void addItem(String name, String description, OpenType type) {
088            itemNamesList.add(name);
089            itemDescriptionsList.add(description);
090            itemTypesList.add(type);
091        }
092
093        protected CompositeType create(Class clazz) {
094            return create(clazz.getName(), clazz.getName());
095        }
096        protected CompositeType create(String name, String description) {
097            try {
098                String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
099                String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
100                OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
101                return new CompositeType(name, description, itemNames, itemDescriptions, itemTypes);
102            } catch (OpenDataException e) {
103                throw new RuntimeException(e);
104            }
105        }
106    }
107
108    private static CompositeType METRICS_COMPOSITE_TYPE;
109    static {
110        CompositeTypeFactory factory = new CompositeTypeFactory();
111        factory.addItem("label", "The queue label", SimpleType.STRING);
112        factory.addItem("duration", "The length of time spent gathering metricsN", SimpleType.DOUBLE);
113
114        factory.addItem("enqueued", "The number of tasks enqueued", SimpleType.LONG);
115        factory.addItem("enqueueTimeMean", "The mean amount of time an enqueued tasks waited before it was executed", SimpleType.DOUBLE);
116        factory.addItem("enqueueTimeMax", "The maximum amount of time a single enqueued task waited before it was executed", SimpleType.DOUBLE);
117        factory.addItem("enqueueTimeTotal", "The total amount of time all enqueued tasks spent waiting to be executed", SimpleType.DOUBLE);
118
119        factory.addItem("executed", "The number of tasks executed", SimpleType.LONG);
120        factory.addItem("executeTimeMean", "The mean amount of time tasks took to execute", SimpleType.DOUBLE);
121        factory.addItem("executeTimeMax", "The maximum amount of time a single task took to execute", SimpleType.DOUBLE);
122        factory.addItem("executeTimeTotal", "The total amount of time all tasks spent executing", SimpleType.DOUBLE);
123        METRICS_COMPOSITE_TYPE = factory.create(Metrics.class);
124    }
125
126    public static CompositeData convert(Metrics metric, TimeUnit timeUnit) throws OpenDataException {
127        Map<String, Object> fields = new HashMap<String, Object>();
128        fields.put("label", metric.queue.getLabel());
129        fields.put("duration", ((double)metric.durationNS) / timeUnit.toNanos(1));
130
131        fields.put("enqueued", metric.enqueued);
132        fields.put("enqueueTimeMean", (((double)metric.totalWaitTimeNS) / timeUnit.toNanos(1))/ metric.dequeued);
133        fields.put("enqueueTimeMax", ((double)metric.maxWaitTimeNS) / timeUnit.toNanos(1) );
134        fields.put("enqueueTimeTotal", ((double)metric.totalWaitTimeNS) / timeUnit.toNanos(1));
135
136        fields.put("executed", metric.dequeued);
137        fields.put("executeTimeMean", (((double)metric.totalRunTimeNS) / timeUnit.toNanos(1))/ metric.dequeued);
138        fields.put("executeTimeMax", ((double)metric.maxRunTimeNS) / timeUnit.toNanos(1));
139        fields.put("executeTimeTotal", ((double)metric.totalRunTimeNS) / timeUnit.toNanos(1));
140        return new CompositeDataSupport(METRICS_COMPOSITE_TYPE, fields);
141    }
142
143    static public ObjectName objectName(HawtDispatcher dispatcher) {
144        try {
145            return new ObjectName(DISPATCHER_OBJECT_NAME+",name="+ObjectName.quote(dispatcher.getLabel()));
146        } catch (MalformedObjectNameException e) {
147            throw new RuntimeException(e);
148        }
149    }
150
151    static public void register(HawtDispatcher dispatcher) throws Exception {
152        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
153        server.registerMBean(new JmxDispatcher(dispatcher), objectName(dispatcher));
154    }
155
156    static public void unregister(HawtDispatcher dispatcher) throws Exception {
157        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
158        server.unregisterMBean(new ObjectName(DISPATCHER_OBJECT_NAME));
159    }
160}