001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.*;
021
022import java.io.IOException;
023import java.net.*;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.ServerSocketChannel;
026import java.nio.channels.SocketChannel;
027import java.util.concurrent.Executor;
028
029/**
030 * A TCP based implementation of {@link TransportServer}
031 *
032 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
033 */
034
035public class TcpTransportServer implements TransportServer {
036
037    protected final String bindScheme;
038    protected final InetSocketAddress bindAddress;
039    protected int backlog = 100;
040    protected ServerSocketChannel channel;
041    protected TransportServerListener listener;
042    protected DispatchQueue dispatchQueue;
043    protected DispatchSource acceptSource;
044    protected int receiveBufferSize = 64*1024;
045    protected int sendBufferSize = 64*1024;
046    protected Executor blockingExecutor;
047
048    public TcpTransportServer(URI location) throws UnknownHostException {
049        bindScheme = location.getScheme();
050        String host = location.getHost();
051        host = (host == null || host.length() == 0) ? "::" : host;
052        bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort());
053    }
054
055    public void setTransportServerListener(TransportServerListener listener) {
056        this.listener = listener;
057    }
058
059    public InetSocketAddress getSocketAddress() {
060        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
061    }
062
063    public DispatchQueue getDispatchQueue() {
064        return dispatchQueue;
065    }
066
067    public void setDispatchQueue(DispatchQueue dispatchQueue) {
068        this.dispatchQueue = dispatchQueue;
069    }
070
071    public void suspend() {
072        acceptSource.suspend();
073    }
074
075    public void resume() {
076        acceptSource.resume();
077    }
078
079    @Deprecated
080    public void start(Runnable onCompleted) throws Exception {
081        start(new TaskWrapper(onCompleted));
082    }
083    @Deprecated
084    public void stop(Runnable onCompleted) throws Exception {
085        stop(new TaskWrapper(onCompleted));
086    }
087
088    public void start(Task onCompleted) throws Exception {
089
090        try {
091            channel = ServerSocketChannel.open();
092            channel.configureBlocking(false);
093            try {
094                channel.socket().setReceiveBufferSize(receiveBufferSize);
095            } catch (SocketException ignore) {
096            }
097            try {
098                channel.socket().setReceiveBufferSize(sendBufferSize);
099            } catch (SocketException ignore) {
100            }
101            channel.socket().bind(bindAddress, backlog);
102        } catch (IOException e) {
103            throw new IOException("Failed to bind to server socket: " + bindAddress + " due to: " + e);
104        }
105
106        acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
107        acceptSource.setEventHandler(new Task() {
108            public void run() {
109                try {
110                    SocketChannel client = channel.accept();
111                    while( client!=null ) {
112                        handleSocket(client);
113                        client = channel.accept();
114                    }
115                } catch (Exception e) {
116                    listener.onAcceptError(e);
117                }
118            }
119        });
120        acceptSource.setCancelHandler(new Task() {
121            public void run() {
122                try {
123                    channel.close();
124                } catch (IOException e) {
125                }
126            }
127        });
128        acceptSource.resume();
129        if( onCompleted!=null ) {
130            dispatchQueue.execute(onCompleted);
131        }
132    }
133
134    public String getBoundAddress() {
135        try {
136            return new URI(bindScheme, null, bindAddress.getAddress().getHostAddress(), channel.socket().getLocalPort(), null, null, null).toString();
137        } catch (URISyntaxException e) {
138            throw new RuntimeException(e);
139        }
140    }
141
142    public void stop(final Task onCompleted) throws Exception {
143        if( acceptSource.isCanceled() ) {
144            onCompleted.run();
145        } else {
146            acceptSource.setCancelHandler(new Task() {
147                public void run() {
148                    try {
149                        channel.close();
150                    } catch (IOException e) {
151                    }
152                    onCompleted.run();
153                }
154            });
155            acceptSource.cancel();
156        }
157    }
158
159    public int getBacklog() {
160        return backlog;
161    }
162
163    public void setBacklog(int backlog) {
164        this.backlog = backlog;
165    }
166
167    protected final void handleSocket(SocketChannel socket) throws Exception {
168        TcpTransport transport = createTransport();
169        transport.connected(socket);
170        listener.onAccept(transport);
171    }
172
173    protected TcpTransport createTransport() {
174        final TcpTransport rc = new TcpTransport();
175        rc.setBlockingExecutor(blockingExecutor);
176        rc.setDispatchQueue(dispatchQueue);
177        return rc;
178    }
179
180    /**
181     * @return pretty print of this
182     */
183    public String toString() {
184        return getBoundAddress();
185    }
186
187    public int getReceiveBufferSize() {
188        return receiveBufferSize;
189    }
190
191    public void setReceiveBufferSize(int receiveBufferSize) {
192        this.receiveBufferSize = receiveBufferSize;
193        if( channel!=null ) {
194            try {
195                channel.socket().setReceiveBufferSize(receiveBufferSize);
196            } catch (SocketException ignore) {
197            }
198        }
199    }
200
201    public int getSendBufferSize() {
202        return sendBufferSize;
203    }
204
205    public void setSendBufferSize(int sendBufferSize) {
206        this.sendBufferSize = sendBufferSize;
207        if( channel!=null ) {
208            try {
209                channel.socket().setReceiveBufferSize(sendBufferSize);
210            } catch (SocketException ignore) {
211            }
212        }
213    }
214
215    public Executor getBlockingExecutor() {
216        return blockingExecutor;
217    }
218
219    public void setBlockingExecutor(Executor blockingExecutor) {
220        this.blockingExecutor = blockingExecutor;
221    }
222
223}