001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.fusesource.hawtdispatch.transport;
018
019import org.fusesource.hawtdispatch.DispatchQueue;
020import org.fusesource.hawtdispatch.Task;
021
022import java.net.*;
023import java.nio.channels.DatagramChannel;
024import java.util.concurrent.Executor;
025
026/**
027 * <p>
028 * </p>
029 *
030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031 */
032public class UdpTransportServer extends ServiceBase implements TransportServer {
033
034    private final String bindScheme;
035    private final InetSocketAddress bindAddress;
036
037    private DatagramChannel channel;
038    private TransportServerListener listener;
039    private DispatchQueue dispatchQueue;
040    private Executor blockingExecutor;
041
042    public UdpTransportServer(URI location) throws UnknownHostException {
043        bindScheme = location.getScheme();
044        String host = location.getHost();
045        host = (host == null || host.length() == 0) ? "::" : host;
046        bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort());
047    }
048
049    private  UdpTransport transport;
050
051    public void setTransportServerListener(TransportServerListener listener) {
052        this.listener = listener;
053    }
054
055    public InetSocketAddress getSocketAddress() {
056        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
057    }
058
059    public DispatchQueue getDispatchQueue() {
060        return dispatchQueue;
061    }
062
063    public void setDispatchQueue(DispatchQueue dispatchQueue) {
064        this.dispatchQueue = dispatchQueue;
065    }
066
067    @Override
068    protected void _start(Task onCompleted) {
069        accept();
070        if( onCompleted!=null ) {
071            dispatchQueue.execute(onCompleted);
072        }
073    }
074
075    private void queueAccept() {
076        dispatchQueue.execute(new Task() {
077            public void run() {
078                accept();
079            }
080        });
081    }
082
083    private void accept() {
084        if (getServiceState().isStarted() || getServiceState().isStarting()) {
085            try {
086                UdpTransport udpTransport = createTransport();
087                transport = udpTransport;
088                transport.onDispose = new Task() {
089                    public void run() {
090                        queueAccept();
091                    }
092                };
093                channel = DatagramChannel.open();
094                channel.socket().bind(bindAddress);
095                transport.connected(channel);
096                listener.onAccept(transport);
097            } catch (Exception e) {
098                listener.onAcceptError(e);
099            }
100        }
101    }
102
103    protected UdpTransport createTransport() {
104        final UdpTransport transport = new UdpTransport();
105        transport.setBlockingExecutor(blockingExecutor);
106        transport.setDispatchQueue(dispatchQueue);
107        return transport;
108    }
109
110    @Override
111    protected void _stop(Task onCompleted) {
112        transport.stop(onCompleted);
113    }
114
115    public void suspend() {
116        dispatchQueue.suspend();
117    }
118
119    public void resume() {
120        dispatchQueue.resume();
121    }
122
123    public String getBoundAddress() {
124        try {
125            String host = bindAddress.getAddress().getHostAddress();
126            int port = channel.socket().getLocalPort();
127            return new URI(bindScheme, null, host, port, null, null, null).toString();
128        } catch (URISyntaxException e) {
129            throw new RuntimeException(e);
130        }
131    }
132
133    /**
134     * @return pretty print of this
135     */
136    public String toString() {
137        return getBoundAddress();
138    }
139
140    public Executor getBlockingExecutor() {
141        return blockingExecutor;
142    }
143
144    public void setBlockingExecutor(Executor blockingExecutor) {
145        this.blockingExecutor = blockingExecutor;
146    }
147}