001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.*;
021
022import java.io.EOFException;
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.net.URI;
026import java.nio.channels.ReadableByteChannel;
027import java.nio.channels.WritableByteChannel;
028import java.util.LinkedList;
029import java.util.concurrent.Executor;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032/**
033 *
034 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
035 */
036public class PipeTransport implements Transport {
037    static private final Object EOF_TOKEN = new Object();
038
039    final private PipeTransportServer server;
040    PipeTransport peer;
041    private TransportListener listener;
042    private SocketAddress remoteAddress;
043    private AtomicBoolean stopping = new AtomicBoolean();
044    private String name;
045    private boolean marshal;
046    private boolean trace;
047
048    private DispatchQueue dispatchQueue;
049    private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource;
050    private boolean connected;
051
052    private long writeCounter = 0;
053    private long readCounter = 0;
054    private ProtocolCodec protocolCodec;
055
056    public PipeTransport(PipeTransportServer server) {
057        this.server = server;
058    }
059
060    public DispatchQueue getDispatchQueue() {
061        return dispatchQueue;
062    }
063    public void setDispatchQueue(DispatchQueue queue) {
064        this.dispatchQueue = queue;
065    }
066
067    @Deprecated
068    public void start(final Runnable onCompleted) {
069        start(new TaskWrapper(onCompleted));
070    }
071    public void start(final Task onCompleted) {
072        if (dispatchQueue == null) {
073            throw new IllegalArgumentException("dispatchQueue is not set");
074        }
075        server.dispatchQueue.execute(new Task(){
076            public void run() {
077                dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue);
078                dispatchSource.setEventHandler(new Task() {
079                    public void run() {
080                        try {
081                            final LinkedList<Object> commands = dispatchSource.getData();
082                            for (Object o : commands) {
083
084                                if (o == EOF_TOKEN) {
085                                    throw new EOFException();
086                                }
087                                readCounter++;
088                                listener.onTransportCommand(o);
089                            }
090
091                            // let the peer know that they have been processed.
092                            peer.dispatchQueue.execute(new Task() {
093                                public void run() {
094                                    outbound -= commands.size();
095                                    drainInbound();
096                                }
097                            });
098                        } catch (IOException e) {
099                            listener.onTransportFailure(e);
100                        }
101
102                    }
103                });
104                if( peer.dispatchSource != null ) {
105                    fireConnected();
106                    peer.fireConnected();
107                }
108                if( onCompleted!=null ) {
109                    onCompleted.run();
110                }
111
112            }
113        });
114    }
115
116    private void fireConnected() {
117        dispatchQueue.execute(new Task() {
118            public void run() {
119                connected = true;
120                dispatchSource.resume();
121                listener.onTransportConnected();
122                drainInbound();
123            }
124        });
125    }
126
127    public void flush() {
128        listener.onRefill();
129    }
130
131    @Deprecated
132    public void stop(final Runnable onCompleted) {
133        stop(new TaskWrapper(onCompleted));
134    }
135    public void stop(Task onCompleted)  {
136        if( connected ) {
137            peer.dispatchSource.merge(EOF_TOKEN);
138        }
139        if( dispatchSource!=null ) {
140            dispatchSource.setCancelHandler(onCompleted);
141            dispatchSource.cancel();
142        }
143        setDispatchQueue(null);
144    }
145
146    static final class OneWay {
147        final Object command;
148        final Retained retained;
149
150        public OneWay(Object command, Retained retained) {
151            this.command = command;
152            this.retained = retained;
153        }
154    }
155
156    int outbound = 0;
157    int maxOutbound = 100;
158
159    public boolean full() {
160        return outbound >= maxOutbound;
161    }
162
163    public boolean offer(Object command) {
164        if( !connected ) {
165            return false;
166        }
167        if( full() ) {
168            return false;
169        } else {
170            transmit(command);
171            return true;
172        }
173    }
174
175    public void drainInbound() {
176        if( !full() ) {
177            listener.onRefill();
178        }
179    }
180
181    private void transmit(Object command) {
182        writeCounter++;
183        outbound++;
184        peer.dispatchSource.merge(command);
185    }
186
187    /**
188     * @return The number of objects sent by the transport.
189     */
190    public long getWriteCounter() {
191        return writeCounter;
192    }
193
194    /**
195     * @return The number of objects received by the transport.
196     */
197    public long getReadCounter() {
198        return readCounter;
199    }
200
201    public SocketAddress getLocalAddress() {
202        return remoteAddress;
203    }
204
205    public SocketAddress getRemoteAddress() {
206        return remoteAddress;
207    }
208
209    public void suspendRead() {
210        dispatchSource.suspend();
211    }
212
213    public void resumeRead() {
214        dispatchSource.resume();
215    }
216
217    public void setRemoteAddress(final String remoteAddress) {
218        this.remoteAddress = new SocketAddress() {
219            @Override
220            public String toString() {
221                return remoteAddress;
222            }
223        };
224        if (name == null) {
225            name = remoteAddress;
226        }
227    }
228
229    public void setName(String name) {
230        this.name = name;
231    }
232
233    public TransportListener getTransportListener() {
234        return listener;
235    }
236    public void setTransportListener(TransportListener transportListener) {
237        this.listener = transportListener;
238    }
239
240    public ProtocolCodec getProtocolCodec() {
241        return protocolCodec;
242    }
243    public void setProtocolCodec(ProtocolCodec protocolCodec) {
244        this.protocolCodec = protocolCodec;
245    }
246
247
248    public boolean isTrace() {
249        return trace;
250    }
251
252    public void setTrace(boolean trace) {
253        this.trace = trace;
254    }
255
256    public boolean isMarshal() {
257        return marshal;
258    }
259    public void setMarshal(boolean marshall) {
260        this.marshal = marshall;
261    }
262
263    public boolean isConnected() {
264        return !stopping.get();
265    }
266    public boolean isClosed() {
267        return false;
268    }
269
270    public Executor getBlockingExecutor() {
271        return null;
272    }
273    public void setBlockingExecutor(Executor blockingExecutor) {
274    }
275
276    public ReadableByteChannel getReadChannel() {
277        return null;
278    }
279
280    public WritableByteChannel getWriteChannel() {
281        return null;
282    }
283}