001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.fusesource.hawtdispatch.transport; 019 020import org.fusesource.hawtdispatch.*; 021 022import java.net.InetSocketAddress; 023import java.net.URI; 024import java.util.LinkedList; 025import java.util.concurrent.Executor; 026import java.util.concurrent.atomic.AtomicInteger; 027 028/** 029 * 030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 031 */ 032public class PipeTransportServer implements TransportServer { 033 034 protected String connectURI; 035 protected TransportServerListener listener; 036 protected String name; 037 protected boolean marshal; 038 protected final AtomicInteger connectionCounter = new AtomicInteger(); 039 DispatchQueue dispatchQueue; 040 041 private CustomDispatchSource<PipeTransport,LinkedList<PipeTransport>> acceptSource; 042 043 044 public String getBoundAddress() { 045 return connectURI; 046 } 047 048 public InetSocketAddress getSocketAddress() { 049 return null; 050 } 051 052 public DispatchQueue getDispatchQueue() { 053 return dispatchQueue; 054 } 055 056 public void setDispatchQueue(DispatchQueue queue) { 057 dispatchQueue = queue; 058 } 059 060 public void suspend() { 061 acceptSource.suspend(); 062 } 063 064 public void resume() { 065 acceptSource.resume(); 066 } 067 068 public void setTransportServerListener(TransportServerListener listener) { 069 this.listener = listener; 070 } 071 072 @Deprecated 073 public void start(Runnable onCompleted) throws Exception { 074 start(new TaskWrapper(onCompleted)); 075 } 076 @Deprecated 077 public void stop(Runnable onCompleted) throws Exception { 078 stop(new TaskWrapper(onCompleted)); 079 } 080 081 public void start(Task onCompleted) throws Exception { 082 acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue); 083 acceptSource.setEventHandler(new Task() { 084 public void run() { 085 LinkedList<PipeTransport> transports = acceptSource.getData(); 086 for (PipeTransport transport : transports) { 087 try { 088 listener.onAccept(transport); 089 } catch (Exception e) { 090 listener.onAcceptError(e); 091 } 092 } 093 } 094 }); 095 acceptSource.resume(); 096 if( onCompleted!=null ) { 097 dispatchQueue.execute(onCompleted); 098 } 099 } 100 101 public void stop(Task onCompleted) throws Exception { 102 PipeTransportRegistry.unbind(this); 103 acceptSource.setCancelHandler(onCompleted); 104 acceptSource.cancel(); 105 } 106 107 public void setConnectURI(String connectURI) { 108 this.connectURI = connectURI; 109 } 110 111 public void setName(String name) { 112 this.name = name; 113 } 114 115 public String getName() { 116 return name; 117 } 118 119 public PipeTransport connect() { 120 int connectionId = connectionCounter.incrementAndGet(); 121 String remoteAddress = connectURI.toString() + "#" + connectionId; 122 assert this.listener != null : "Server does not have an accept listener"; 123 124 PipeTransport clientTransport = createClientTransport(); 125 PipeTransport serverTransport = createServerTransport(); 126 clientTransport.peer = serverTransport; 127 serverTransport.peer = clientTransport; 128 129 clientTransport.setRemoteAddress(remoteAddress); 130 serverTransport.setRemoteAddress(remoteAddress); 131 132 serverTransport.setMarshal(marshal); 133 this.acceptSource.merge(serverTransport); 134 return clientTransport; 135 } 136 137 protected PipeTransport createClientTransport() { 138 return new PipeTransport(this); 139 } 140 141 protected PipeTransport createServerTransport() { 142 return new PipeTransport(this); 143 } 144 145 public boolean isMarshal() { 146 return marshal; 147 } 148 149 public void setMarshal(boolean marshal) { 150 this.marshal = marshal; 151 } 152 153 public Executor getBlockingExecutor() { 154 return null; 155 } 156 157 public void setBlockingExecutor(Executor blockingExecutor) { 158 } 159}