001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.fusesource.hawtdispatch.transport; 019 020import org.fusesource.hawtdispatch.*; 021 022import java.io.EOFException; 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.net.URI; 026import java.nio.channels.ReadableByteChannel; 027import java.nio.channels.WritableByteChannel; 028import java.util.LinkedList; 029import java.util.concurrent.Executor; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032/** 033 * 034 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 035 */ 036public class PipeTransport implements Transport { 037 static private final Object EOF_TOKEN = new Object(); 038 039 final private PipeTransportServer server; 040 PipeTransport peer; 041 private TransportListener listener; 042 private SocketAddress remoteAddress; 043 private AtomicBoolean stopping = new AtomicBoolean(); 044 private String name; 045 private boolean marshal; 046 private boolean trace; 047 048 private DispatchQueue dispatchQueue; 049 private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource; 050 private boolean connected; 051 052 private long writeCounter = 0; 053 private long readCounter = 0; 054 private ProtocolCodec protocolCodec; 055 056 public PipeTransport(PipeTransportServer server) { 057 this.server = server; 058 } 059 060 public DispatchQueue getDispatchQueue() { 061 return dispatchQueue; 062 } 063 public void setDispatchQueue(DispatchQueue queue) { 064 this.dispatchQueue = queue; 065 } 066 067 @Deprecated 068 public void start(final Runnable onCompleted) { 069 start(new TaskWrapper(onCompleted)); 070 } 071 public void start(final Task onCompleted) { 072 if (dispatchQueue == null) { 073 throw new IllegalArgumentException("dispatchQueue is not set"); 074 } 075 server.dispatchQueue.execute(new Task(){ 076 public void run() { 077 dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue); 078 dispatchSource.setEventHandler(new Task() { 079 public void run() { 080 try { 081 final LinkedList<Object> commands = dispatchSource.getData(); 082 for (Object o : commands) { 083 084 if (o == EOF_TOKEN) { 085 throw new EOFException(); 086 } 087 readCounter++; 088 listener.onTransportCommand(o); 089 } 090 091 // let the peer know that they have been processed. 092 peer.dispatchQueue.execute(new Task() { 093 public void run() { 094 outbound -= commands.size(); 095 drainInbound(); 096 } 097 }); 098 } catch (IOException e) { 099 listener.onTransportFailure(e); 100 } 101 102 } 103 }); 104 if( peer.dispatchSource != null ) { 105 fireConnected(); 106 peer.fireConnected(); 107 } 108 if( onCompleted!=null ) { 109 onCompleted.run(); 110 } 111 112 } 113 }); 114 } 115 116 private void fireConnected() { 117 dispatchQueue.execute(new Task() { 118 public void run() { 119 connected = true; 120 dispatchSource.resume(); 121 listener.onTransportConnected(); 122 drainInbound(); 123 } 124 }); 125 } 126 127 public void flush() { 128 listener.onRefill(); 129 } 130 131 @Deprecated 132 public void stop(final Runnable onCompleted) { 133 stop(new TaskWrapper(onCompleted)); 134 } 135 public void stop(Task onCompleted) { 136 if( connected ) { 137 peer.dispatchSource.merge(EOF_TOKEN); 138 } 139 if( dispatchSource!=null ) { 140 dispatchSource.setCancelHandler(onCompleted); 141 dispatchSource.cancel(); 142 } 143 setDispatchQueue(null); 144 } 145 146 static final class OneWay { 147 final Object command; 148 final Retained retained; 149 150 public OneWay(Object command, Retained retained) { 151 this.command = command; 152 this.retained = retained; 153 } 154 } 155 156 int outbound = 0; 157 int maxOutbound = 100; 158 159 public boolean full() { 160 return outbound >= maxOutbound; 161 } 162 163 public boolean offer(Object command) { 164 if( !connected ) { 165 return false; 166 } 167 if( full() ) { 168 return false; 169 } else { 170 transmit(command); 171 return true; 172 } 173 } 174 175 public void drainInbound() { 176 if( !full() ) { 177 listener.onRefill(); 178 } 179 } 180 181 private void transmit(Object command) { 182 writeCounter++; 183 outbound++; 184 peer.dispatchSource.merge(command); 185 } 186 187 /** 188 * @return The number of objects sent by the transport. 189 */ 190 public long getWriteCounter() { 191 return writeCounter; 192 } 193 194 /** 195 * @return The number of objects received by the transport. 196 */ 197 public long getReadCounter() { 198 return readCounter; 199 } 200 201 public SocketAddress getLocalAddress() { 202 return remoteAddress; 203 } 204 205 public SocketAddress getRemoteAddress() { 206 return remoteAddress; 207 } 208 209 public void suspendRead() { 210 dispatchSource.suspend(); 211 } 212 213 public void resumeRead() { 214 dispatchSource.resume(); 215 } 216 217 public void setRemoteAddress(final String remoteAddress) { 218 this.remoteAddress = new SocketAddress() { 219 @Override 220 public String toString() { 221 return remoteAddress; 222 } 223 }; 224 if (name == null) { 225 name = remoteAddress; 226 } 227 } 228 229 public void setName(String name) { 230 this.name = name; 231 } 232 233 public TransportListener getTransportListener() { 234 return listener; 235 } 236 public void setTransportListener(TransportListener transportListener) { 237 this.listener = transportListener; 238 } 239 240 public ProtocolCodec getProtocolCodec() { 241 return protocolCodec; 242 } 243 public void setProtocolCodec(ProtocolCodec protocolCodec) { 244 this.protocolCodec = protocolCodec; 245 } 246 247 248 public boolean isTrace() { 249 return trace; 250 } 251 252 public void setTrace(boolean trace) { 253 this.trace = trace; 254 } 255 256 public boolean isMarshal() { 257 return marshal; 258 } 259 public void setMarshal(boolean marshall) { 260 this.marshal = marshall; 261 } 262 263 public boolean isConnected() { 264 return !stopping.get(); 265 } 266 public boolean isClosed() { 267 return false; 268 } 269 270 public Executor getBlockingExecutor() { 271 return null; 272 } 273 public void setBlockingExecutor(Executor blockingExecutor) { 274 } 275 276 public ReadableByteChannel getReadChannel() { 277 return null; 278 } 279 280 public WritableByteChannel getWriteChannel() { 281 return null; 282 } 283}