001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.fusesource.hawtdispatch.transport; 018 019import org.fusesource.hawtdispatch.DispatchQueue; 020import org.fusesource.hawtdispatch.Task; 021 022import java.net.*; 023import java.nio.channels.DatagramChannel; 024import java.util.concurrent.Executor; 025 026/** 027 * <p> 028 * </p> 029 * 030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 031 */ 032public class UdpTransportServer extends ServiceBase implements TransportServer { 033 034 private final String bindScheme; 035 private final InetSocketAddress bindAddress; 036 037 private DatagramChannel channel; 038 private TransportServerListener listener; 039 private DispatchQueue dispatchQueue; 040 private Executor blockingExecutor; 041 042 public UdpTransportServer(URI location) throws UnknownHostException { 043 bindScheme = location.getScheme(); 044 String host = location.getHost(); 045 host = (host == null || host.length() == 0) ? "::" : host; 046 bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort()); 047 } 048 049 private UdpTransport transport; 050 051 public void setTransportServerListener(TransportServerListener listener) { 052 this.listener = listener; 053 } 054 055 public InetSocketAddress getSocketAddress() { 056 return (InetSocketAddress) channel.socket().getLocalSocketAddress(); 057 } 058 059 public DispatchQueue getDispatchQueue() { 060 return dispatchQueue; 061 } 062 063 public void setDispatchQueue(DispatchQueue dispatchQueue) { 064 this.dispatchQueue = dispatchQueue; 065 } 066 067 @Override 068 protected void _start(Task onCompleted) { 069 accept(); 070 if( onCompleted!=null ) { 071 dispatchQueue.execute(onCompleted); 072 } 073 } 074 075 private void queueAccept() { 076 dispatchQueue.execute(new Task() { 077 public void run() { 078 accept(); 079 } 080 }); 081 } 082 083 private void accept() { 084 if (getServiceState().isStarted() || getServiceState().isStarting()) { 085 try { 086 UdpTransport udpTransport = createTransport(); 087 transport = udpTransport; 088 transport.onDispose = new Task() { 089 public void run() { 090 queueAccept(); 091 } 092 }; 093 channel = DatagramChannel.open(); 094 channel.socket().bind(bindAddress); 095 transport.connected(channel); 096 listener.onAccept(transport); 097 } catch (Exception e) { 098 listener.onAcceptError(e); 099 } 100 } 101 } 102 103 protected UdpTransport createTransport() { 104 final UdpTransport transport = new UdpTransport(); 105 transport.setBlockingExecutor(blockingExecutor); 106 transport.setDispatchQueue(dispatchQueue); 107 return transport; 108 } 109 110 @Override 111 protected void _stop(Task onCompleted) { 112 transport.stop(onCompleted); 113 } 114 115 public void suspend() { 116 dispatchQueue.suspend(); 117 } 118 119 public void resume() { 120 dispatchQueue.resume(); 121 } 122 123 public String getBoundAddress() { 124 try { 125 String host = bindAddress.getAddress().getHostAddress(); 126 int port = channel.socket().getLocalPort(); 127 return new URI(bindScheme, null, host, port, null, null, null).toString(); 128 } catch (URISyntaxException e) { 129 throw new RuntimeException(e); 130 } 131 } 132 133 /** 134 * @return pretty print of this 135 */ 136 public String toString() { 137 return getBoundAddress(); 138 } 139 140 public Executor getBlockingExecutor() { 141 return blockingExecutor; 142 } 143 144 public void setBlockingExecutor(Executor blockingExecutor) { 145 this.blockingExecutor = blockingExecutor; 146 } 147}