001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.*;
021import org.fusesource.hawtdispatch.internal.BaseSuspendable;
022
023import java.io.IOException;
024import java.net.*;
025import java.nio.ByteBuffer;
026import java.nio.channels.*;
027import java.util.LinkedList;
028import java.util.concurrent.Executor;
029import java.util.concurrent.TimeUnit;
030
031/**
032 * An implementation of the {@link org.fusesource.hawtdispatch.transport.Transport} interface using raw tcp/ip
033 *
034 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
035 */
036public class TcpTransport extends ServiceBase implements Transport {
037
038    static InetAddress localhost;
039    synchronized static public InetAddress getLocalHost() throws UnknownHostException {
040        // cache it...
041        if( localhost==null ) {
042            // this can be slow on some systems and we use repeatedly.
043            localhost = InetAddress.getLocalHost();
044        }
045        return localhost;
046    }
047
048    abstract static class SocketState {
049        void onStop(Task onCompleted) {
050        }
051        void onCanceled() {
052        }
053        boolean is(Class<? extends SocketState> clazz) {
054            return getClass()==clazz;
055        }
056    }
057
058    static class DISCONNECTED extends SocketState{}
059
060    class CONNECTING extends SocketState{
061        void onStop(Task onCompleted) {
062            trace("CONNECTING.onStop");
063            CANCELING state = new CANCELING();
064            socketState = state;
065            state.onStop(onCompleted);
066        }
067        void onCanceled() {
068            trace("CONNECTING.onCanceled");
069            CANCELING state = new CANCELING();
070            socketState = state;
071            state.onCanceled();
072        }
073    }
074
075    class CONNECTED extends SocketState {
076
077        public CONNECTED() {
078            localAddress = channel.socket().getLocalSocketAddress();
079            remoteAddress = channel.socket().getRemoteSocketAddress();
080        }
081
082        void onStop(Task onCompleted) {
083            trace("CONNECTED.onStop");
084            CANCELING state = new CANCELING();
085            socketState = state;
086            state.add(createDisconnectTask());
087            state.onStop(onCompleted);
088        }
089        void onCanceled() {
090            trace("CONNECTED.onCanceled");
091            CANCELING state = new CANCELING();
092            socketState = state;
093            state.add(createDisconnectTask());
094            state.onCanceled();
095        }
096        Task createDisconnectTask() {
097            return new Task(){
098                public void run() {
099                    listener.onTransportDisconnected();
100                }
101            };
102        }
103    }
104
105    class CANCELING extends SocketState {
106        private LinkedList<Task> runnables =  new LinkedList<Task>();
107        private int remaining;
108        private boolean dispose;
109
110        public CANCELING() {
111            if( readSource!=null ) {
112                remaining++;
113                readSource.cancel();
114            }
115            if( writeSource!=null ) {
116                remaining++;
117                writeSource.cancel();
118            }
119        }
120        void onStop(Task onCompleted) {
121            trace("CANCELING.onCompleted");
122            add(onCompleted);
123            dispose = true;
124        }
125        void add(Task onCompleted) {
126            if( onCompleted!=null ) {
127                runnables.add(onCompleted);
128            }
129        }
130        void onCanceled() {
131            trace("CANCELING.onCanceled");
132            remaining--;
133            if( remaining!=0 ) {
134                return;
135            }
136            try {
137                if( closeOnCancel ) {
138                    channel.close();
139                }
140            } catch (IOException ignore) {
141            }
142            socketState = new CANCELED(dispose);
143            for (Task runnable : runnables) {
144                runnable.run();
145            }
146            if (dispose) {
147                dispose();
148            }
149        }
150    }
151
152    class CANCELED extends SocketState {
153        private boolean disposed;
154
155        public CANCELED(boolean disposed) {
156            this.disposed=disposed;
157        }
158
159        void onStop(Task onCompleted) {
160            trace("CANCELED.onStop");
161            if( !disposed ) {
162                disposed = true;
163                dispose();
164            }
165            onCompleted.run();
166        }
167    }
168
169    protected URI remoteLocation;
170    protected URI localLocation;
171    protected TransportListener listener;
172    protected ProtocolCodec codec;
173
174    protected SocketChannel channel;
175
176    protected SocketState socketState = new DISCONNECTED();
177
178    protected DispatchQueue dispatchQueue;
179    private DispatchSource readSource;
180    private DispatchSource writeSource;
181    protected CustomDispatchSource<Integer, Integer> drainOutboundSource;
182    protected CustomDispatchSource<Integer, Integer> yieldSource;
183
184    protected boolean useLocalHost = true;
185
186    int maxReadRate;
187    int maxWriteRate;
188    int receiveBufferSize = 1024*64;
189    int sendBufferSize = 1024*64;
190    boolean closeOnCancel = true;
191
192    boolean keepAlive = true;
193
194    public static final int IPTOS_LOWCOST = 0x02;
195    public static final int IPTOS_RELIABILITY = 0x04;
196    public static final int IPTOS_THROUGHPUT = 0x08;
197    public static final int IPTOS_LOWDELAY = 0x10;
198
199    int trafficClass = IPTOS_THROUGHPUT;
200
201    protected RateLimitingChannel rateLimitingChannel;
202    SocketAddress localAddress;
203    SocketAddress remoteAddress;
204    protected Executor blockingExecutor;
205
206    class RateLimitingChannel implements ScatteringByteChannel, GatheringByteChannel {
207
208        int read_allowance = maxReadRate;
209        boolean read_suspended = false;
210//        int read_resume_counter = 0;
211        int write_allowance = maxWriteRate;
212        boolean write_suspended = false;
213
214        public void resetAllowance() {
215            if( read_allowance != maxReadRate || write_allowance != maxWriteRate) {
216                read_allowance = maxReadRate;
217                write_allowance = maxWriteRate;
218                if( write_suspended ) {
219                    write_suspended = false;
220                    resumeWrite();
221                }
222                if( read_suspended ) {
223                    read_suspended = false;
224                    resumeRead();
225                }
226            }
227        }
228
229        public int read(ByteBuffer dst) throws IOException {
230            if( maxReadRate ==0 ) {
231                return channel.read(dst);
232            } else {
233                int rc=0;
234                int reduction = 0;
235                try {
236                    int remaining = dst.remaining();
237                    if( read_allowance ==0 || remaining ==0 ) {
238                        return 0;
239                    }
240                    if( remaining > read_allowance) {
241                        reduction = remaining - read_allowance;
242                        dst.limit(dst.limit() - reduction);
243                    }
244                    rc = channel.read(dst);
245                    read_allowance -= rc;
246                } finally {
247                    if( read_allowance<=0 && !read_suspended ) {
248                        // we need to suspend the read now until we get
249                        // a new allowance..
250                        readSource.suspend();
251                        read_suspended = true;
252                    }
253                    if( reduction!=0 ) {
254                        dst.limit(dst.limit() + reduction);
255                    }
256                }
257                return rc;
258            }
259        }
260
261        public int write(ByteBuffer src) throws IOException {
262            if( maxWriteRate ==0 ) {
263                return channel.write(src);
264            } else {
265                int remaining = src.remaining();
266                if( write_allowance ==0 || remaining ==0 ) {
267                    return 0;
268                }
269
270                int reduction = 0;
271                if( remaining > write_allowance) {
272                    reduction = remaining - write_allowance;
273                    src.limit(src.limit() - reduction);
274                }
275                int rc = 0;
276                try {
277                    rc = channel.write(src);
278                    write_allowance -= rc;
279                } finally {
280                    if( reduction!=0 ) {
281                        if( src.remaining() == 0 ) {
282                            // we need to suspend the read now until we get
283                            // a new allowance..
284                            write_suspended = true;
285                            suspendWrite();
286                        }
287                        src.limit(src.limit() + reduction);
288                    }
289                }
290                return rc;
291            }
292        }
293
294        public boolean isOpen() {
295            return channel.isOpen();
296        }
297
298        public void close() throws IOException {
299            channel.close();
300        }
301
302        public void resumeRead() {
303//            if( read_suspended ) {
304//                read_resume_counter += 1;
305//            } else {
306                _resumeRead();
307//            }
308        }
309
310        public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
311            if(offset+length > dsts.length || length<0 || offset<0) {
312                throw new IndexOutOfBoundsException();
313            }
314            long rc=0;
315            for (int i = 0; i < length; i++) {
316                ByteBuffer dst = dsts[offset+i];
317                if(dst.hasRemaining()) {
318                    rc += read(dst);
319                }
320                if( dst.hasRemaining() ) {
321                    return rc;
322                }
323            }
324            return rc;
325        }
326
327        public long read(ByteBuffer[] dsts) throws IOException {
328            return read(dsts, 0, dsts.length);
329        }
330
331        public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
332            if(offset+length > srcs.length || length<0 || offset<0) {
333                throw new IndexOutOfBoundsException();
334            }
335            long rc=0;
336            for (int i = 0; i < length; i++) {
337                ByteBuffer src = srcs[offset+i];
338                if(src.hasRemaining()) {
339                    rc += write(src);
340                }
341                if( src.hasRemaining() ) {
342                    return rc;
343                }
344            }
345            return rc;
346        }
347
348        public long write(ByteBuffer[] srcs) throws IOException {
349            return write(srcs, 0, srcs.length);
350        }
351
352    }
353
354    private final Task CANCEL_HANDLER = new Task() {
355        public void run() {
356            socketState.onCanceled();
357        }
358    };
359
360    static final class OneWay {
361        final Object command;
362        final Retained retained;
363
364        public OneWay(Object command, Retained retained) {
365            this.command = command;
366            this.retained = retained;
367        }
368    }
369
370    public void connected(SocketChannel channel) throws IOException, Exception {
371        this.channel = channel;
372        initializeChannel();
373        this.socketState = new CONNECTED();
374    }
375
376    protected void initializeChannel() throws Exception {
377        this.channel.configureBlocking(false);
378        Socket socket = channel.socket();
379        try {
380            socket.setReuseAddress(true);
381        } catch (SocketException e) {
382        }
383        try {
384            socket.setSoLinger(true, 0);
385        } catch (SocketException e) {
386        }
387        try {
388            socket.setTrafficClass(trafficClass);
389        } catch (SocketException e) {
390        }
391        try {
392            socket.setKeepAlive(keepAlive);
393        } catch (SocketException e) {
394        }
395        try {
396            socket.setTcpNoDelay(true);
397        } catch (SocketException e) {
398        }
399        try {
400            socket.setReceiveBufferSize(receiveBufferSize);
401        } catch (SocketException e) {
402        }
403        try {
404            socket.setSendBufferSize(sendBufferSize);
405        } catch (SocketException e) {
406        }
407
408        if( channel!=null && codec!=null ) {
409            initializeCodec();
410        }
411    }
412
413    protected void initializeCodec() throws Exception {
414        codec.setTransport(this);
415    }
416
417    private void initRateLimitingChannel() {
418        if( (maxReadRate !=0 || maxWriteRate !=0) && rateLimitingChannel==null ) {
419            rateLimitingChannel = new RateLimitingChannel();
420        }
421    }
422
423    public void connecting(final URI remoteLocation, final URI localLocation) throws Exception {
424        this.channel = SocketChannel.open();
425        initializeChannel();
426        this.remoteLocation = remoteLocation;
427        this.localLocation = localLocation;
428        socketState = new CONNECTING();
429    }
430
431
432    public DispatchQueue getDispatchQueue() {
433        return dispatchQueue;
434    }
435
436    public void setDispatchQueue(DispatchQueue queue) {
437        this.dispatchQueue = queue;
438        if(readSource!=null) readSource.setTargetQueue(queue);
439        if(writeSource!=null) writeSource.setTargetQueue(queue);
440        if(drainOutboundSource!=null) drainOutboundSource.setTargetQueue(queue);
441        if(yieldSource!=null) yieldSource.setTargetQueue(queue);
442    }
443
444    public void _start(Task onCompleted) {
445        try {
446            if (socketState.is(CONNECTING.class)) {
447
448                // Resolving host names might block.. so do it on the blocking executor.
449                this.blockingExecutor.execute(new Runnable() {
450                    public void run() {
451                        try {
452
453                            final InetSocketAddress localAddress = (localLocation != null) ?
454                                    new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort())
455                                    : null;
456
457                            String host = resolveHostName(remoteLocation.getHost());
458                            final InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
459
460                            // Done resolving.. switch back to the dispatch queue.
461                            dispatchQueue.execute(new Task() {
462                                @Override
463                                public void run() {
464                                    // No need to complete if we have been canceled.
465                                    if( ! socketState.is(CONNECTING.class) ) {
466                                        return;
467                                    }
468                                    try {
469
470                                        if (localAddress != null) {
471                                            channel.socket().bind(localAddress);
472                                        }
473                                        trace("connecting...");
474                                        if (channel.connect(remoteAddress)) {
475                                            socketState = new CONNECTED();
476                                            onConnected();
477                                            return;
478                                        }
479
480                                        // this allows the connect to complete..
481                                        readSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
482                                        readSource.setEventHandler(new Task() {
483                                            public void run() {
484                                                if (getServiceState() != STARTED) {
485                                                    return;
486                                                }
487                                                try {
488                                                    trace("connected.");
489                                                    channel.finishConnect();
490                                                    readSource.setCancelHandler(null);
491                                                    readSource.cancel();
492                                                    readSource = null;
493                                                    socketState = new CONNECTED();
494                                                    onConnected();
495                                                } catch (IOException e) {
496                                                    onTransportFailure(e);
497                                                }
498                                            }
499                                        });
500                                        readSource.setCancelHandler(CANCEL_HANDLER);
501                                        readSource.resume();
502
503                                    } catch (Exception e) {
504                                        try {
505                                            channel.close();
506                                        } catch (Exception ignore) {
507                                        }
508                                        socketState = new CANCELED(true);
509                                        if (! (e instanceof IOException)) {
510                                            e = new IOException(e);
511                                        }
512                                        listener.onTransportFailure((IOException)e);
513                                    }
514                                }
515                            });
516
517                        } catch (final IOException e) {
518                            // we're in blockingExecutor thread context here
519                            dispatchQueue.execute(new Task() {
520                                public void run() {
521                                    try {
522                                        channel.close();
523                                    } catch (IOException ignore) {
524                                    }
525                                    socketState = new CANCELED(true);
526                                    listener.onTransportFailure(e);
527                                }
528                            });
529                        }
530                    }
531                });
532            } else if (socketState.is(CONNECTED.class)) {
533                dispatchQueue.execute(new Task() {
534                    public void run() {
535                        try {
536                            trace("was connected.");
537                            onConnected();
538                        } catch (IOException e) {
539                            onTransportFailure(e);
540                        }
541                    }
542                });
543            } else {
544                trace("cannot be started.  socket state is: " + socketState);
545            }
546        } finally {
547            if (onCompleted != null) {
548                onCompleted.run();
549            }
550        }
551    }
552
553    public void _stop(final Task onCompleted) {
554        trace("stopping.. at state: "+socketState);
555        socketState.onStop(onCompleted);
556    }
557
558    protected String resolveHostName(String host) throws UnknownHostException {
559        if (isUseLocalHost()) {
560            String localName = getLocalHost().getHostName();
561            if (localName != null && localName.equals(host)) {
562                return "localhost";
563            }
564        }
565        return host;
566    }
567
568    protected void onConnected() throws IOException {
569        yieldSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue);
570        yieldSource.setEventHandler(new Task() {
571            public void run() {
572                drainInbound();
573            }
574        });
575        yieldSource.resume();
576        drainOutboundSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue);
577        drainOutboundSource.setEventHandler(new Task() {
578            public void run() {
579                flush();
580            }
581        });
582        drainOutboundSource.resume();
583
584        readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
585        writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
586
587        readSource.setCancelHandler(CANCEL_HANDLER);
588        writeSource.setCancelHandler(CANCEL_HANDLER);
589
590        readSource.setEventHandler(new Task() {
591            public void run() {
592                drainInbound();
593            }
594        });
595        writeSource.setEventHandler(new Task() {
596            public void run() {
597                flush();
598            }
599        });
600
601        initRateLimitingChannel();
602        if( rateLimitingChannel!=null ) {
603            schedualRateAllowanceReset();
604        }
605        listener.onTransportConnected();
606    }
607
608    private void schedualRateAllowanceReset() {
609        dispatchQueue.executeAfter(1, TimeUnit.SECONDS, new Task(){
610            public void run() {
611                if( !socketState.is(CONNECTED.class) ) {
612                    return;
613                }
614                rateLimitingChannel.resetAllowance();
615                schedualRateAllowanceReset();
616            }
617        });
618    }
619
620    private void dispose() {
621        if( readSource!=null ) {
622            readSource.cancel();
623            readSource=null;
624        }
625
626        if( writeSource!=null ) {
627            writeSource.cancel();
628            writeSource=null;
629        }
630    }
631
632    public void onTransportFailure(IOException error) {
633        listener.onTransportFailure(error);
634        // socketState.onCanceled();
635    }
636
637
638    public boolean full() {
639        return codec==null ||
640               codec.full() ||
641               !socketState.is(CONNECTED.class) ||
642               getServiceState() != STARTED;
643    }
644
645    boolean rejectingOffers;
646
647    public boolean offer(Object command) {
648        dispatchQueue.assertExecuting();
649        if( full() ) {
650            return false;
651        }
652        try {
653            ProtocolCodec.BufferState rc = codec.write(command);
654            rejectingOffers = codec.full();
655            switch (rc ) {
656                case FULL:
657                    return false;
658                default:
659                    drainOutboundSource.merge(1);
660            }
661        } catch (IOException e) {
662            onTransportFailure(e);
663        }
664        return true;
665    }
666
667    boolean writeResumedForCodecFlush = false;
668
669    /**
670     *
671     */
672    public void flush() {
673        dispatchQueue.assertExecuting();
674        if (getServiceState() != STARTED || !socketState.is(CONNECTED.class)) {
675            return;
676        }
677        try {
678            if( codec.flush() == ProtocolCodec.BufferState.EMPTY && transportFlush() ) {
679                if( writeResumedForCodecFlush) {
680                    writeResumedForCodecFlush = false;
681                    suspendWrite();
682                }
683                rejectingOffers = false;
684                listener.onRefill();
685
686            } else {
687                if(!writeResumedForCodecFlush) {
688                    writeResumedForCodecFlush = true;
689                    resumeWrite();
690                }
691            }
692        } catch (IOException e) {
693            onTransportFailure(e);
694        }
695    }
696
697    protected boolean transportFlush() throws IOException {
698        return true;
699    }
700
701    public void drainInbound() {
702        if (!getServiceState().isStarted() || readSource.isSuspended()) {
703            return;
704        }
705        try {
706            long initial = codec.getReadCounter();
707            // Only process upto 2 x the read buffer worth of data at a time so we can give
708            // other connections a chance to process their requests.
709            while( codec.getReadCounter()-initial < codec.getReadBufferSize()<<2 ) {
710                Object command = codec.read();
711                if ( command!=null ) {
712                    try {
713                        listener.onTransportCommand(command);
714                    } catch (Throwable e) {
715                        e.printStackTrace();
716                        onTransportFailure(new IOException("Transport listener failure."));
717                    }
718
719                    // the transport may be suspended after processing a command.
720                    if (getServiceState() == STOPPED || readSource.isSuspended()) {
721                        return;
722                    }
723                } else {
724                    return;
725                }
726            }
727            yieldSource.merge(1);
728        } catch (IOException e) {
729            onTransportFailure(e);
730        }
731    }
732
733    public SocketAddress getLocalAddress() {
734        return localAddress;
735    }
736
737    public SocketAddress getRemoteAddress() {
738        return remoteAddress;
739    }
740
741    private boolean assertConnected() {
742        try {
743            if ( !isConnected() ) {
744                throw new IOException("Not connected.");
745            }
746            return true;
747        } catch (IOException e) {
748            onTransportFailure(e);
749        }
750        return false;
751    }
752
753    public void suspendRead() {
754        if( isConnected() && readSource!=null ) {
755            readSource.suspend();
756        }
757    }
758
759
760    public void resumeRead() {
761        if( isConnected() && readSource!=null ) {
762            if( rateLimitingChannel!=null ) {
763                rateLimitingChannel.resumeRead();
764            } else {
765                _resumeRead();
766            }
767        }
768    }
769
770    private void _resumeRead() {
771        readSource.resume();
772        dispatchQueue.execute(new Task(){
773            public void run() {
774                drainInbound();
775            }
776        });
777    }
778
779    protected void suspendWrite() {
780        if( isConnected() && writeSource!=null ) {
781            writeSource.suspend();
782        }
783    }
784
785    protected void resumeWrite() {
786        if( isConnected() && writeSource!=null ) {
787            writeSource.resume();
788        }
789    }
790
791    public TransportListener getTransportListener() {
792        return listener;
793    }
794
795    public void setTransportListener(TransportListener transportListener) {
796        this.listener = transportListener;
797    }
798
799    public ProtocolCodec getProtocolCodec() {
800        return codec;
801    }
802
803    public void setProtocolCodec(ProtocolCodec protocolCodec) throws Exception {
804        this.codec = protocolCodec;
805        if( channel!=null && codec!=null ) {
806            initializeCodec();
807        }
808    }
809
810    public boolean isConnected() {
811        return socketState.is(CONNECTED.class);
812    }
813
814    public boolean isClosed() {
815        return getServiceState() == STOPPED;
816    }
817
818    public boolean isUseLocalHost() {
819        return useLocalHost;
820    }
821
822    /**
823     * Sets whether 'localhost' or the actual local host name should be used to
824     * make local connections. On some operating systems such as Macs its not
825     * possible to connect as the local host name so localhost is better.
826     */
827    public void setUseLocalHost(boolean useLocalHost) {
828        this.useLocalHost = useLocalHost;
829    }
830
831    private void trace(String message) {
832        // TODO:
833    }
834
835    public SocketChannel getSocketChannel() {
836        return channel;
837    }
838
839    public ReadableByteChannel getReadChannel() {
840        initRateLimitingChannel();
841        if(rateLimitingChannel!=null) {
842            return rateLimitingChannel;
843        } else {
844            return channel;
845        }
846    }
847
848    public WritableByteChannel getWriteChannel() {
849        initRateLimitingChannel();
850        if(rateLimitingChannel!=null) {
851            return rateLimitingChannel;
852        } else {
853            return channel;
854        }
855    }
856
857    public int getMaxReadRate() {
858        return maxReadRate;
859    }
860
861    public void setMaxReadRate(int maxReadRate) {
862        this.maxReadRate = maxReadRate;
863    }
864
865    public int getMaxWriteRate() {
866        return maxWriteRate;
867    }
868
869    public void setMaxWriteRate(int maxWriteRate) {
870        this.maxWriteRate = maxWriteRate;
871    }
872
873    public int getTrafficClass() {
874        return trafficClass;
875    }
876
877    public void setTrafficClass(int trafficClass) {
878        this.trafficClass = trafficClass;
879    }
880
881    public int getReceiveBufferSize() {
882        return receiveBufferSize;
883    }
884
885    public void setReceiveBufferSize(int receiveBufferSize) {
886        this.receiveBufferSize = receiveBufferSize;
887        if( channel!=null ) {
888            try {
889                channel.socket().setReceiveBufferSize(receiveBufferSize);
890            } catch (SocketException ignore) {
891            }
892        }
893    }
894
895    public int getSendBufferSize() {
896        return sendBufferSize;
897    }
898
899    public void setSendBufferSize(int sendBufferSize) {
900        this.sendBufferSize = sendBufferSize;
901        if( channel!=null ) {
902            try {
903                channel.socket().setReceiveBufferSize(sendBufferSize);
904            } catch (SocketException ignore) {
905            }
906        }
907    }
908
909    public boolean isKeepAlive() {
910        return keepAlive;
911    }
912
913    public void setKeepAlive(boolean keepAlive) {
914        this.keepAlive = keepAlive;
915    }
916
917    public Executor getBlockingExecutor() {
918        return blockingExecutor;
919    }
920
921    public void setBlockingExecutor(Executor blockingExecutor) {
922        this.blockingExecutor = blockingExecutor;
923    }
924
925    public boolean isCloseOnCancel() {
926        return closeOnCancel;
927    }
928
929    public void setCloseOnCancel(boolean closeOnCancel) {
930        this.closeOnCancel = closeOnCancel;
931    }
932}