001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.*;
021
022import java.net.InetSocketAddress;
023import java.net.URI;
024import java.util.LinkedList;
025import java.util.concurrent.Executor;
026import java.util.concurrent.atomic.AtomicInteger;
027
028/**
029 *
030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031 */
032public class PipeTransportServer implements TransportServer {
033
034    protected String connectURI;
035    protected TransportServerListener listener;
036    protected String name;
037    protected boolean marshal;
038    protected final AtomicInteger connectionCounter = new AtomicInteger();
039    DispatchQueue dispatchQueue;
040
041    private CustomDispatchSource<PipeTransport,LinkedList<PipeTransport>> acceptSource;
042
043
044    public String getBoundAddress() {
045        return connectURI;
046    }
047
048    public InetSocketAddress getSocketAddress() {
049        return null;
050    }
051
052    public DispatchQueue getDispatchQueue() {
053        return dispatchQueue;
054    }
055
056    public void setDispatchQueue(DispatchQueue queue) {
057        dispatchQueue = queue;
058    }
059
060    public void suspend() {
061        acceptSource.suspend();
062    }
063
064    public void resume() {
065        acceptSource.resume();
066    }
067
068    public void setTransportServerListener(TransportServerListener listener) {
069        this.listener = listener;
070    }
071
072    @Deprecated
073    public void start(Runnable onCompleted) throws Exception {
074        start(new TaskWrapper(onCompleted));
075    }
076    @Deprecated
077    public void stop(Runnable onCompleted) throws Exception {
078        stop(new TaskWrapper(onCompleted));
079    }
080
081    public void start(Task onCompleted) throws Exception {
082        acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
083        acceptSource.setEventHandler(new Task() {
084            public void run() {
085                LinkedList<PipeTransport> transports = acceptSource.getData();
086                for (PipeTransport transport : transports) {
087                    try {
088                        listener.onAccept(transport);
089                    } catch (Exception e) {
090                        listener.onAcceptError(e);
091                    }
092                }
093            }
094        });
095        acceptSource.resume();
096        if( onCompleted!=null ) {
097            dispatchQueue.execute(onCompleted);
098        }
099    }
100
101    public void stop(Task onCompleted) throws Exception {
102        PipeTransportRegistry.unbind(this);
103        acceptSource.setCancelHandler(onCompleted);
104        acceptSource.cancel();
105    }
106
107    public void setConnectURI(String connectURI) {
108        this.connectURI = connectURI;
109    }
110
111    public void setName(String name) {
112        this.name = name;
113    }
114
115    public String getName() {
116        return name;
117    }
118
119    public PipeTransport connect() {
120        int connectionId = connectionCounter.incrementAndGet();
121        String remoteAddress = connectURI.toString() + "#" + connectionId;
122        assert this.listener != null : "Server does not have an accept listener";
123
124        PipeTransport clientTransport = createClientTransport();
125        PipeTransport serverTransport = createServerTransport();
126        clientTransport.peer = serverTransport;
127        serverTransport.peer = clientTransport;
128
129        clientTransport.setRemoteAddress(remoteAddress);
130        serverTransport.setRemoteAddress(remoteAddress);
131
132        serverTransport.setMarshal(marshal);
133        this.acceptSource.merge(serverTransport);
134        return clientTransport;
135    }
136
137    protected PipeTransport createClientTransport() {
138        return new PipeTransport(this);
139    }
140    
141    protected PipeTransport createServerTransport() {
142        return new PipeTransport(this);
143    }
144
145    public boolean isMarshal() {
146        return marshal;
147    }
148
149    public void setMarshal(boolean marshal) {
150        this.marshal = marshal;
151    }
152
153    public Executor getBlockingExecutor() {
154        return null;
155    }
156
157    public void setBlockingExecutor(Executor blockingExecutor) {
158    }
159}