001package org.fusesource.hawtdispatch.jmx; 002 003import org.fusesource.hawtdispatch.Dispatcher; 004import org.fusesource.hawtdispatch.Metrics; 005import org.fusesource.hawtdispatch.internal.HawtDispatcher; 006 007import javax.management.*; 008import javax.management.openmbean.*; 009import java.lang.management.ManagementFactory; 010import java.util.*; 011import java.util.concurrent.TimeUnit; 012 013/** 014 * 015 */ 016public class JmxService { 017 018 public static final String DISPATCHER_OBJECT_NAME = "org.hawtdispatch:type=Dispatcher"; 019 020 static public interface JmxDispatcherMBean { 021 @MBeanInfo("Used to enable or disable profiling") 022 public void setTimeUnit(String unit); 023 024 @MBeanInfo("Is profiling enabled.") 025 public String getTimeUnit(); 026 027 028 @MBeanInfo("Used to enable or disable profiling") 029 public void setProfile(boolean enabled); 030 031 @MBeanInfo("Is profiling enabled.") 032 public boolean getProfile(); 033 034 @MBeanInfo("Get the collected profiling metrics.") 035 public CompositeData[] getMetrics() throws OpenDataException; 036 } 037 038 static public class JmxDispatcher implements JmxDispatcherMBean { 039 040 final Dispatcher dispatcher; 041 TimeUnit timeUnit = TimeUnit.MILLISECONDS; 042 043 public JmxDispatcher(Dispatcher dispatcher) { 044 this.dispatcher = dispatcher; 045 } 046 047 public String getTimeUnit() { 048 return timeUnit.name(); 049 } 050 051 public void setTimeUnit(String unit) { 052 this.timeUnit = TimeUnit.valueOf(unit); 053 } 054 055 public boolean getProfile() { 056 return dispatcher.profile(); 057 } 058 059 public void setProfile(boolean enabled) { 060 dispatcher.profile(enabled); 061 } 062 063 public CompositeData[] getMetrics() throws OpenDataException { 064 ArrayList<CompositeData> rc = new ArrayList<CompositeData>(); 065 066 // lets sort by runtime. 067 ArrayList<Metrics> metrics = new ArrayList<Metrics>(dispatcher.metrics()); 068 Collections.sort(metrics, new Comparator<Metrics>() { 069 public int compare(Metrics l, Metrics r) { 070 if( l.totalRunTimeNS == r.totalRunTimeNS ) 071 return 0; 072 return l.totalRunTimeNS < r.totalRunTimeNS ? 1 : -1; 073 } 074 }); 075 for (Metrics metric : metrics) { 076 rc.add(convert(metric, timeUnit)); 077 } 078 return rc.toArray(new CompositeData[rc.size()]); 079 } 080 } 081 082 static class CompositeTypeFactory { 083 private final List<String> itemNamesList = new ArrayList<String>(); 084 private final List<String> itemDescriptionsList = new ArrayList<String>(); 085 private final List<OpenType> itemTypesList = new ArrayList<OpenType>(); 086 087 protected void addItem(String name, String description, OpenType type) { 088 itemNamesList.add(name); 089 itemDescriptionsList.add(description); 090 itemTypesList.add(type); 091 } 092 093 protected CompositeType create(Class clazz) { 094 return create(clazz.getName(), clazz.getName()); 095 } 096 protected CompositeType create(String name, String description) { 097 try { 098 String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); 099 String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); 100 OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); 101 return new CompositeType(name, description, itemNames, itemDescriptions, itemTypes); 102 } catch (OpenDataException e) { 103 throw new RuntimeException(e); 104 } 105 } 106 } 107 108 private static CompositeType METRICS_COMPOSITE_TYPE; 109 static { 110 CompositeTypeFactory factory = new CompositeTypeFactory(); 111 factory.addItem("label", "The queue label", SimpleType.STRING); 112 factory.addItem("duration", "The length of time spent gathering metricsN", SimpleType.DOUBLE); 113 114 factory.addItem("enqueued", "The number of tasks enqueued", SimpleType.LONG); 115 factory.addItem("enqueueTimeMean", "The mean amount of time an enqueued tasks waited before it was executed", SimpleType.DOUBLE); 116 factory.addItem("enqueueTimeMax", "The maximum amount of time a single enqueued task waited before it was executed", SimpleType.DOUBLE); 117 factory.addItem("enqueueTimeTotal", "The total amount of time all enqueued tasks spent waiting to be executed", SimpleType.DOUBLE); 118 119 factory.addItem("executed", "The number of tasks executed", SimpleType.LONG); 120 factory.addItem("executeTimeMean", "The mean amount of time tasks took to execute", SimpleType.DOUBLE); 121 factory.addItem("executeTimeMax", "The maximum amount of time a single task took to execute", SimpleType.DOUBLE); 122 factory.addItem("executeTimeTotal", "The total amount of time all tasks spent executing", SimpleType.DOUBLE); 123 METRICS_COMPOSITE_TYPE = factory.create(Metrics.class); 124 } 125 126 public static CompositeData convert(Metrics metric, TimeUnit timeUnit) throws OpenDataException { 127 Map<String, Object> fields = new HashMap<String, Object>(); 128 fields.put("label", metric.queue.getLabel()); 129 fields.put("duration", ((double)metric.durationNS) / timeUnit.toNanos(1)); 130 131 fields.put("enqueued", metric.enqueued); 132 fields.put("enqueueTimeMean", (((double)metric.totalWaitTimeNS) / timeUnit.toNanos(1))/ metric.dequeued); 133 fields.put("enqueueTimeMax", ((double)metric.maxWaitTimeNS) / timeUnit.toNanos(1) ); 134 fields.put("enqueueTimeTotal", ((double)metric.totalWaitTimeNS) / timeUnit.toNanos(1)); 135 136 fields.put("executed", metric.dequeued); 137 fields.put("executeTimeMean", (((double)metric.totalRunTimeNS) / timeUnit.toNanos(1))/ metric.dequeued); 138 fields.put("executeTimeMax", ((double)metric.maxRunTimeNS) / timeUnit.toNanos(1)); 139 fields.put("executeTimeTotal", ((double)metric.totalRunTimeNS) / timeUnit.toNanos(1)); 140 return new CompositeDataSupport(METRICS_COMPOSITE_TYPE, fields); 141 } 142 143 static public ObjectName objectName(HawtDispatcher dispatcher) { 144 try { 145 return new ObjectName(DISPATCHER_OBJECT_NAME+",name="+ObjectName.quote(dispatcher.getLabel())); 146 } catch (MalformedObjectNameException e) { 147 throw new RuntimeException(e); 148 } 149 } 150 151 static public void register(HawtDispatcher dispatcher) throws Exception { 152 MBeanServer server = ManagementFactory.getPlatformMBeanServer(); 153 server.registerMBean(new JmxDispatcher(dispatcher), objectName(dispatcher)); 154 } 155 156 static public void unregister(HawtDispatcher dispatcher) throws Exception { 157 MBeanServer server = ManagementFactory.getPlatformMBeanServer(); 158 server.unregisterMBean(new ObjectName(DISPATCHER_OBJECT_NAME)); 159 } 160}