001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.fusesource.hawtdispatch.transport; 019 020import org.fusesource.hawtdispatch.*; 021 022import java.io.IOException; 023import java.net.*; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.ServerSocketChannel; 026import java.nio.channels.SocketChannel; 027import java.util.concurrent.Executor; 028 029/** 030 * A TCP based implementation of {@link TransportServer} 031 * 032 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 033 */ 034 035public class TcpTransportServer implements TransportServer { 036 037 protected final String bindScheme; 038 protected final InetSocketAddress bindAddress; 039 protected int backlog = 100; 040 protected ServerSocketChannel channel; 041 protected TransportServerListener listener; 042 protected DispatchQueue dispatchQueue; 043 protected DispatchSource acceptSource; 044 protected int receiveBufferSize = 64*1024; 045 protected int sendBufferSize = 64*1024; 046 protected Executor blockingExecutor; 047 048 public TcpTransportServer(URI location) throws UnknownHostException { 049 bindScheme = location.getScheme(); 050 String host = location.getHost(); 051 host = (host == null || host.length() == 0) ? "::" : host; 052 bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort()); 053 } 054 055 public void setTransportServerListener(TransportServerListener listener) { 056 this.listener = listener; 057 } 058 059 public InetSocketAddress getSocketAddress() { 060 return (InetSocketAddress) channel.socket().getLocalSocketAddress(); 061 } 062 063 public DispatchQueue getDispatchQueue() { 064 return dispatchQueue; 065 } 066 067 public void setDispatchQueue(DispatchQueue dispatchQueue) { 068 this.dispatchQueue = dispatchQueue; 069 } 070 071 public void suspend() { 072 acceptSource.suspend(); 073 } 074 075 public void resume() { 076 acceptSource.resume(); 077 } 078 079 @Deprecated 080 public void start(Runnable onCompleted) throws Exception { 081 start(new TaskWrapper(onCompleted)); 082 } 083 @Deprecated 084 public void stop(Runnable onCompleted) throws Exception { 085 stop(new TaskWrapper(onCompleted)); 086 } 087 088 public void start(Task onCompleted) throws Exception { 089 090 try { 091 channel = ServerSocketChannel.open(); 092 channel.configureBlocking(false); 093 try { 094 channel.socket().setReceiveBufferSize(receiveBufferSize); 095 } catch (SocketException ignore) { 096 } 097 try { 098 channel.socket().setReceiveBufferSize(sendBufferSize); 099 } catch (SocketException ignore) { 100 } 101 channel.socket().bind(bindAddress, backlog); 102 } catch (IOException e) { 103 throw new IOException("Failed to bind to server socket: " + bindAddress + " due to: " + e); 104 } 105 106 acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue); 107 acceptSource.setEventHandler(new Task() { 108 public void run() { 109 try { 110 SocketChannel client = channel.accept(); 111 while( client!=null ) { 112 handleSocket(client); 113 client = channel.accept(); 114 } 115 } catch (Exception e) { 116 listener.onAcceptError(e); 117 } 118 } 119 }); 120 acceptSource.setCancelHandler(new Task() { 121 public void run() { 122 try { 123 channel.close(); 124 } catch (IOException e) { 125 } 126 } 127 }); 128 acceptSource.resume(); 129 if( onCompleted!=null ) { 130 dispatchQueue.execute(onCompleted); 131 } 132 } 133 134 public String getBoundAddress() { 135 try { 136 return new URI(bindScheme, null, bindAddress.getAddress().getHostAddress(), channel.socket().getLocalPort(), null, null, null).toString(); 137 } catch (URISyntaxException e) { 138 throw new RuntimeException(e); 139 } 140 } 141 142 public void stop(final Task onCompleted) throws Exception { 143 if( acceptSource.isCanceled() ) { 144 onCompleted.run(); 145 } else { 146 acceptSource.setCancelHandler(new Task() { 147 public void run() { 148 try { 149 channel.close(); 150 } catch (IOException e) { 151 } 152 onCompleted.run(); 153 } 154 }); 155 acceptSource.cancel(); 156 } 157 } 158 159 public int getBacklog() { 160 return backlog; 161 } 162 163 public void setBacklog(int backlog) { 164 this.backlog = backlog; 165 } 166 167 protected final void handleSocket(SocketChannel socket) throws Exception { 168 TcpTransport transport = createTransport(); 169 transport.connected(socket); 170 listener.onAccept(transport); 171 } 172 173 protected TcpTransport createTransport() { 174 final TcpTransport rc = new TcpTransport(); 175 rc.setBlockingExecutor(blockingExecutor); 176 rc.setDispatchQueue(dispatchQueue); 177 return rc; 178 } 179 180 /** 181 * @return pretty print of this 182 */ 183 public String toString() { 184 return getBoundAddress(); 185 } 186 187 public int getReceiveBufferSize() { 188 return receiveBufferSize; 189 } 190 191 public void setReceiveBufferSize(int receiveBufferSize) { 192 this.receiveBufferSize = receiveBufferSize; 193 if( channel!=null ) { 194 try { 195 channel.socket().setReceiveBufferSize(receiveBufferSize); 196 } catch (SocketException ignore) { 197 } 198 } 199 } 200 201 public int getSendBufferSize() { 202 return sendBufferSize; 203 } 204 205 public void setSendBufferSize(int sendBufferSize) { 206 this.sendBufferSize = sendBufferSize; 207 if( channel!=null ) { 208 try { 209 channel.socket().setReceiveBufferSize(sendBufferSize); 210 } catch (SocketException ignore) { 211 } 212 } 213 } 214 215 public Executor getBlockingExecutor() { 216 return blockingExecutor; 217 } 218 219 public void setBlockingExecutor(Executor blockingExecutor) { 220 this.blockingExecutor = blockingExecutor; 221 } 222 223}