001/** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.fusesource.hawtdispatch.transport; 019 020import org.fusesource.hawtdispatch.*; 021import org.fusesource.hawtdispatch.internal.BaseSuspendable; 022 023import java.io.IOException; 024import java.net.*; 025import java.nio.ByteBuffer; 026import java.nio.channels.*; 027import java.util.LinkedList; 028import java.util.concurrent.Executor; 029import java.util.concurrent.TimeUnit; 030 031/** 032 * An implementation of the {@link org.fusesource.hawtdispatch.transport.Transport} interface using raw tcp/ip 033 * 034 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 035 */ 036public class TcpTransport extends ServiceBase implements Transport { 037 038 static InetAddress localhost; 039 synchronized static public InetAddress getLocalHost() throws UnknownHostException { 040 // cache it... 041 if( localhost==null ) { 042 // this can be slow on some systems and we use repeatedly. 043 localhost = InetAddress.getLocalHost(); 044 } 045 return localhost; 046 } 047 048 abstract static class SocketState { 049 void onStop(Task onCompleted) { 050 } 051 void onCanceled() { 052 } 053 boolean is(Class<? extends SocketState> clazz) { 054 return getClass()==clazz; 055 } 056 } 057 058 static class DISCONNECTED extends SocketState{} 059 060 class CONNECTING extends SocketState{ 061 void onStop(Task onCompleted) { 062 trace("CONNECTING.onStop"); 063 CANCELING state = new CANCELING(); 064 socketState = state; 065 state.onStop(onCompleted); 066 } 067 void onCanceled() { 068 trace("CONNECTING.onCanceled"); 069 CANCELING state = new CANCELING(); 070 socketState = state; 071 state.onCanceled(); 072 } 073 } 074 075 class CONNECTED extends SocketState { 076 077 public CONNECTED() { 078 localAddress = channel.socket().getLocalSocketAddress(); 079 remoteAddress = channel.socket().getRemoteSocketAddress(); 080 } 081 082 void onStop(Task onCompleted) { 083 trace("CONNECTED.onStop"); 084 CANCELING state = new CANCELING(); 085 socketState = state; 086 state.add(createDisconnectTask()); 087 state.onStop(onCompleted); 088 } 089 void onCanceled() { 090 trace("CONNECTED.onCanceled"); 091 CANCELING state = new CANCELING(); 092 socketState = state; 093 state.add(createDisconnectTask()); 094 state.onCanceled(); 095 } 096 Task createDisconnectTask() { 097 return new Task(){ 098 public void run() { 099 listener.onTransportDisconnected(); 100 } 101 }; 102 } 103 } 104 105 class CANCELING extends SocketState { 106 private LinkedList<Task> runnables = new LinkedList<Task>(); 107 private int remaining; 108 private boolean dispose; 109 110 public CANCELING() { 111 if( readSource!=null ) { 112 remaining++; 113 readSource.cancel(); 114 } 115 if( writeSource!=null ) { 116 remaining++; 117 writeSource.cancel(); 118 } 119 } 120 void onStop(Task onCompleted) { 121 trace("CANCELING.onCompleted"); 122 add(onCompleted); 123 dispose = true; 124 } 125 void add(Task onCompleted) { 126 if( onCompleted!=null ) { 127 runnables.add(onCompleted); 128 } 129 } 130 void onCanceled() { 131 trace("CANCELING.onCanceled"); 132 remaining--; 133 if( remaining!=0 ) { 134 return; 135 } 136 try { 137 if( closeOnCancel ) { 138 channel.close(); 139 } 140 } catch (IOException ignore) { 141 } 142 socketState = new CANCELED(dispose); 143 for (Task runnable : runnables) { 144 runnable.run(); 145 } 146 if (dispose) { 147 dispose(); 148 } 149 } 150 } 151 152 class CANCELED extends SocketState { 153 private boolean disposed; 154 155 public CANCELED(boolean disposed) { 156 this.disposed=disposed; 157 } 158 159 void onStop(Task onCompleted) { 160 trace("CANCELED.onStop"); 161 if( !disposed ) { 162 disposed = true; 163 dispose(); 164 } 165 onCompleted.run(); 166 } 167 } 168 169 protected URI remoteLocation; 170 protected URI localLocation; 171 protected TransportListener listener; 172 protected ProtocolCodec codec; 173 174 protected SocketChannel channel; 175 176 protected SocketState socketState = new DISCONNECTED(); 177 178 protected DispatchQueue dispatchQueue; 179 private DispatchSource readSource; 180 private DispatchSource writeSource; 181 protected CustomDispatchSource<Integer, Integer> drainOutboundSource; 182 protected CustomDispatchSource<Integer, Integer> yieldSource; 183 184 protected boolean useLocalHost = true; 185 186 int maxReadRate; 187 int maxWriteRate; 188 int receiveBufferSize = 1024*64; 189 int sendBufferSize = 1024*64; 190 boolean closeOnCancel = true; 191 192 boolean keepAlive = true; 193 194 public static final int IPTOS_LOWCOST = 0x02; 195 public static final int IPTOS_RELIABILITY = 0x04; 196 public static final int IPTOS_THROUGHPUT = 0x08; 197 public static final int IPTOS_LOWDELAY = 0x10; 198 199 int trafficClass = IPTOS_THROUGHPUT; 200 201 protected RateLimitingChannel rateLimitingChannel; 202 SocketAddress localAddress; 203 SocketAddress remoteAddress; 204 protected Executor blockingExecutor; 205 206 class RateLimitingChannel implements ScatteringByteChannel, GatheringByteChannel { 207 208 int read_allowance = maxReadRate; 209 boolean read_suspended = false; 210// int read_resume_counter = 0; 211 int write_allowance = maxWriteRate; 212 boolean write_suspended = false; 213 214 public void resetAllowance() { 215 if( read_allowance != maxReadRate || write_allowance != maxWriteRate) { 216 read_allowance = maxReadRate; 217 write_allowance = maxWriteRate; 218 if( write_suspended ) { 219 write_suspended = false; 220 resumeWrite(); 221 } 222 if( read_suspended ) { 223 read_suspended = false; 224 resumeRead(); 225 } 226 } 227 } 228 229 public int read(ByteBuffer dst) throws IOException { 230 if( maxReadRate ==0 ) { 231 return channel.read(dst); 232 } else { 233 int rc=0; 234 int reduction = 0; 235 try { 236 int remaining = dst.remaining(); 237 if( read_allowance ==0 || remaining ==0 ) { 238 return 0; 239 } 240 if( remaining > read_allowance) { 241 reduction = remaining - read_allowance; 242 dst.limit(dst.limit() - reduction); 243 } 244 rc = channel.read(dst); 245 read_allowance -= rc; 246 } finally { 247 if( read_allowance<=0 && !read_suspended ) { 248 // we need to suspend the read now until we get 249 // a new allowance.. 250 readSource.suspend(); 251 read_suspended = true; 252 } 253 if( reduction!=0 ) { 254 dst.limit(dst.limit() + reduction); 255 } 256 } 257 return rc; 258 } 259 } 260 261 public int write(ByteBuffer src) throws IOException { 262 if( maxWriteRate ==0 ) { 263 return channel.write(src); 264 } else { 265 int remaining = src.remaining(); 266 if( write_allowance ==0 || remaining ==0 ) { 267 return 0; 268 } 269 270 int reduction = 0; 271 if( remaining > write_allowance) { 272 reduction = remaining - write_allowance; 273 src.limit(src.limit() - reduction); 274 } 275 int rc = 0; 276 try { 277 rc = channel.write(src); 278 write_allowance -= rc; 279 } finally { 280 if( reduction!=0 ) { 281 if( src.remaining() == 0 ) { 282 // we need to suspend the read now until we get 283 // a new allowance.. 284 write_suspended = true; 285 suspendWrite(); 286 } 287 src.limit(src.limit() + reduction); 288 } 289 } 290 return rc; 291 } 292 } 293 294 public boolean isOpen() { 295 return channel.isOpen(); 296 } 297 298 public void close() throws IOException { 299 channel.close(); 300 } 301 302 public void resumeRead() { 303// if( read_suspended ) { 304// read_resume_counter += 1; 305// } else { 306 _resumeRead(); 307// } 308 } 309 310 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { 311 if(offset+length > dsts.length || length<0 || offset<0) { 312 throw new IndexOutOfBoundsException(); 313 } 314 long rc=0; 315 for (int i = 0; i < length; i++) { 316 ByteBuffer dst = dsts[offset+i]; 317 if(dst.hasRemaining()) { 318 rc += read(dst); 319 } 320 if( dst.hasRemaining() ) { 321 return rc; 322 } 323 } 324 return rc; 325 } 326 327 public long read(ByteBuffer[] dsts) throws IOException { 328 return read(dsts, 0, dsts.length); 329 } 330 331 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { 332 if(offset+length > srcs.length || length<0 || offset<0) { 333 throw new IndexOutOfBoundsException(); 334 } 335 long rc=0; 336 for (int i = 0; i < length; i++) { 337 ByteBuffer src = srcs[offset+i]; 338 if(src.hasRemaining()) { 339 rc += write(src); 340 } 341 if( src.hasRemaining() ) { 342 return rc; 343 } 344 } 345 return rc; 346 } 347 348 public long write(ByteBuffer[] srcs) throws IOException { 349 return write(srcs, 0, srcs.length); 350 } 351 352 } 353 354 private final Task CANCEL_HANDLER = new Task() { 355 public void run() { 356 socketState.onCanceled(); 357 } 358 }; 359 360 static final class OneWay { 361 final Object command; 362 final Retained retained; 363 364 public OneWay(Object command, Retained retained) { 365 this.command = command; 366 this.retained = retained; 367 } 368 } 369 370 public void connected(SocketChannel channel) throws IOException, Exception { 371 this.channel = channel; 372 initializeChannel(); 373 this.socketState = new CONNECTED(); 374 } 375 376 protected void initializeChannel() throws Exception { 377 this.channel.configureBlocking(false); 378 Socket socket = channel.socket(); 379 try { 380 socket.setReuseAddress(true); 381 } catch (SocketException e) { 382 } 383 try { 384 socket.setSoLinger(true, 0); 385 } catch (SocketException e) { 386 } 387 try { 388 socket.setTrafficClass(trafficClass); 389 } catch (SocketException e) { 390 } 391 try { 392 socket.setKeepAlive(keepAlive); 393 } catch (SocketException e) { 394 } 395 try { 396 socket.setTcpNoDelay(true); 397 } catch (SocketException e) { 398 } 399 try { 400 socket.setReceiveBufferSize(receiveBufferSize); 401 } catch (SocketException e) { 402 } 403 try { 404 socket.setSendBufferSize(sendBufferSize); 405 } catch (SocketException e) { 406 } 407 408 if( channel!=null && codec!=null ) { 409 initializeCodec(); 410 } 411 } 412 413 protected void initializeCodec() throws Exception { 414 codec.setTransport(this); 415 } 416 417 private void initRateLimitingChannel() { 418 if( (maxReadRate !=0 || maxWriteRate !=0) && rateLimitingChannel==null ) { 419 rateLimitingChannel = new RateLimitingChannel(); 420 } 421 } 422 423 public void connecting(final URI remoteLocation, final URI localLocation) throws Exception { 424 this.channel = SocketChannel.open(); 425 initializeChannel(); 426 this.remoteLocation = remoteLocation; 427 this.localLocation = localLocation; 428 socketState = new CONNECTING(); 429 } 430 431 432 public DispatchQueue getDispatchQueue() { 433 return dispatchQueue; 434 } 435 436 public void setDispatchQueue(DispatchQueue queue) { 437 this.dispatchQueue = queue; 438 if(readSource!=null) readSource.setTargetQueue(queue); 439 if(writeSource!=null) writeSource.setTargetQueue(queue); 440 if(drainOutboundSource!=null) drainOutboundSource.setTargetQueue(queue); 441 if(yieldSource!=null) yieldSource.setTargetQueue(queue); 442 } 443 444 public void _start(Task onCompleted) { 445 try { 446 if (socketState.is(CONNECTING.class)) { 447 448 // Resolving host names might block.. so do it on the blocking executor. 449 this.blockingExecutor.execute(new Runnable() { 450 public void run() { 451 try { 452 453 final InetSocketAddress localAddress = (localLocation != null) ? 454 new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()) 455 : null; 456 457 String host = resolveHostName(remoteLocation.getHost()); 458 final InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 459 460 // Done resolving.. switch back to the dispatch queue. 461 dispatchQueue.execute(new Task() { 462 @Override 463 public void run() { 464 // No need to complete if we have been canceled. 465 if( ! socketState.is(CONNECTING.class) ) { 466 return; 467 } 468 try { 469 470 if (localAddress != null) { 471 channel.socket().bind(localAddress); 472 } 473 trace("connecting..."); 474 if (channel.connect(remoteAddress)) { 475 socketState = new CONNECTED(); 476 onConnected(); 477 return; 478 } 479 480 // this allows the connect to complete.. 481 readSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue); 482 readSource.setEventHandler(new Task() { 483 public void run() { 484 if (getServiceState() != STARTED) { 485 return; 486 } 487 try { 488 trace("connected."); 489 channel.finishConnect(); 490 readSource.setCancelHandler(null); 491 readSource.cancel(); 492 readSource = null; 493 socketState = new CONNECTED(); 494 onConnected(); 495 } catch (IOException e) { 496 onTransportFailure(e); 497 } 498 } 499 }); 500 readSource.setCancelHandler(CANCEL_HANDLER); 501 readSource.resume(); 502 503 } catch (Exception e) { 504 try { 505 channel.close(); 506 } catch (Exception ignore) { 507 } 508 socketState = new CANCELED(true); 509 if (! (e instanceof IOException)) { 510 e = new IOException(e); 511 } 512 listener.onTransportFailure((IOException)e); 513 } 514 } 515 }); 516 517 } catch (final IOException e) { 518 // we're in blockingExecutor thread context here 519 dispatchQueue.execute(new Task() { 520 public void run() { 521 try { 522 channel.close(); 523 } catch (IOException ignore) { 524 } 525 socketState = new CANCELED(true); 526 listener.onTransportFailure(e); 527 } 528 }); 529 } 530 } 531 }); 532 } else if (socketState.is(CONNECTED.class)) { 533 dispatchQueue.execute(new Task() { 534 public void run() { 535 try { 536 trace("was connected."); 537 onConnected(); 538 } catch (IOException e) { 539 onTransportFailure(e); 540 } 541 } 542 }); 543 } else { 544 trace("cannot be started. socket state is: " + socketState); 545 } 546 } finally { 547 if (onCompleted != null) { 548 onCompleted.run(); 549 } 550 } 551 } 552 553 public void _stop(final Task onCompleted) { 554 trace("stopping.. at state: "+socketState); 555 socketState.onStop(onCompleted); 556 } 557 558 protected String resolveHostName(String host) throws UnknownHostException { 559 if (isUseLocalHost()) { 560 String localName = getLocalHost().getHostName(); 561 if (localName != null && localName.equals(host)) { 562 return "localhost"; 563 } 564 } 565 return host; 566 } 567 568 protected void onConnected() throws IOException { 569 yieldSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue); 570 yieldSource.setEventHandler(new Task() { 571 public void run() { 572 drainInbound(); 573 } 574 }); 575 yieldSource.resume(); 576 drainOutboundSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue); 577 drainOutboundSource.setEventHandler(new Task() { 578 public void run() { 579 flush(); 580 } 581 }); 582 drainOutboundSource.resume(); 583 584 readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue); 585 writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue); 586 587 readSource.setCancelHandler(CANCEL_HANDLER); 588 writeSource.setCancelHandler(CANCEL_HANDLER); 589 590 readSource.setEventHandler(new Task() { 591 public void run() { 592 drainInbound(); 593 } 594 }); 595 writeSource.setEventHandler(new Task() { 596 public void run() { 597 flush(); 598 } 599 }); 600 601 initRateLimitingChannel(); 602 if( rateLimitingChannel!=null ) { 603 schedualRateAllowanceReset(); 604 } 605 listener.onTransportConnected(); 606 } 607 608 private void schedualRateAllowanceReset() { 609 dispatchQueue.executeAfter(1, TimeUnit.SECONDS, new Task(){ 610 public void run() { 611 if( !socketState.is(CONNECTED.class) ) { 612 return; 613 } 614 rateLimitingChannel.resetAllowance(); 615 schedualRateAllowanceReset(); 616 } 617 }); 618 } 619 620 private void dispose() { 621 if( readSource!=null ) { 622 readSource.cancel(); 623 readSource=null; 624 } 625 626 if( writeSource!=null ) { 627 writeSource.cancel(); 628 writeSource=null; 629 } 630 } 631 632 public void onTransportFailure(IOException error) { 633 listener.onTransportFailure(error); 634 // socketState.onCanceled(); 635 } 636 637 638 public boolean full() { 639 return codec==null || 640 codec.full() || 641 !socketState.is(CONNECTED.class) || 642 getServiceState() != STARTED; 643 } 644 645 boolean rejectingOffers; 646 647 public boolean offer(Object command) { 648 dispatchQueue.assertExecuting(); 649 if( full() ) { 650 return false; 651 } 652 try { 653 ProtocolCodec.BufferState rc = codec.write(command); 654 rejectingOffers = codec.full(); 655 switch (rc ) { 656 case FULL: 657 return false; 658 default: 659 drainOutboundSource.merge(1); 660 } 661 } catch (IOException e) { 662 onTransportFailure(e); 663 } 664 return true; 665 } 666 667 boolean writeResumedForCodecFlush = false; 668 669 /** 670 * 671 */ 672 public void flush() { 673 dispatchQueue.assertExecuting(); 674 if (getServiceState() != STARTED || !socketState.is(CONNECTED.class)) { 675 return; 676 } 677 try { 678 if( codec.flush() == ProtocolCodec.BufferState.EMPTY && transportFlush() ) { 679 if( writeResumedForCodecFlush) { 680 writeResumedForCodecFlush = false; 681 suspendWrite(); 682 } 683 rejectingOffers = false; 684 listener.onRefill(); 685 686 } else { 687 if(!writeResumedForCodecFlush) { 688 writeResumedForCodecFlush = true; 689 resumeWrite(); 690 } 691 } 692 } catch (IOException e) { 693 onTransportFailure(e); 694 } 695 } 696 697 protected boolean transportFlush() throws IOException { 698 return true; 699 } 700 701 public void drainInbound() { 702 if (!getServiceState().isStarted() || readSource.isSuspended()) { 703 return; 704 } 705 try { 706 long initial = codec.getReadCounter(); 707 // Only process upto 2 x the read buffer worth of data at a time so we can give 708 // other connections a chance to process their requests. 709 while( codec.getReadCounter()-initial < codec.getReadBufferSize()<<2 ) { 710 Object command = codec.read(); 711 if ( command!=null ) { 712 try { 713 listener.onTransportCommand(command); 714 } catch (Throwable e) { 715 e.printStackTrace(); 716 onTransportFailure(new IOException("Transport listener failure.")); 717 } 718 719 // the transport may be suspended after processing a command. 720 if (getServiceState() == STOPPED || readSource.isSuspended()) { 721 return; 722 } 723 } else { 724 return; 725 } 726 } 727 yieldSource.merge(1); 728 } catch (IOException e) { 729 onTransportFailure(e); 730 } 731 } 732 733 public SocketAddress getLocalAddress() { 734 return localAddress; 735 } 736 737 public SocketAddress getRemoteAddress() { 738 return remoteAddress; 739 } 740 741 private boolean assertConnected() { 742 try { 743 if ( !isConnected() ) { 744 throw new IOException("Not connected."); 745 } 746 return true; 747 } catch (IOException e) { 748 onTransportFailure(e); 749 } 750 return false; 751 } 752 753 public void suspendRead() { 754 if( isConnected() && readSource!=null ) { 755 readSource.suspend(); 756 } 757 } 758 759 760 public void resumeRead() { 761 if( isConnected() && readSource!=null ) { 762 if( rateLimitingChannel!=null ) { 763 rateLimitingChannel.resumeRead(); 764 } else { 765 _resumeRead(); 766 } 767 } 768 } 769 770 private void _resumeRead() { 771 readSource.resume(); 772 dispatchQueue.execute(new Task(){ 773 public void run() { 774 drainInbound(); 775 } 776 }); 777 } 778 779 protected void suspendWrite() { 780 if( isConnected() && writeSource!=null ) { 781 writeSource.suspend(); 782 } 783 } 784 785 protected void resumeWrite() { 786 if( isConnected() && writeSource!=null ) { 787 writeSource.resume(); 788 } 789 } 790 791 public TransportListener getTransportListener() { 792 return listener; 793 } 794 795 public void setTransportListener(TransportListener transportListener) { 796 this.listener = transportListener; 797 } 798 799 public ProtocolCodec getProtocolCodec() { 800 return codec; 801 } 802 803 public void setProtocolCodec(ProtocolCodec protocolCodec) throws Exception { 804 this.codec = protocolCodec; 805 if( channel!=null && codec!=null ) { 806 initializeCodec(); 807 } 808 } 809 810 public boolean isConnected() { 811 return socketState.is(CONNECTED.class); 812 } 813 814 public boolean isClosed() { 815 return getServiceState() == STOPPED; 816 } 817 818 public boolean isUseLocalHost() { 819 return useLocalHost; 820 } 821 822 /** 823 * Sets whether 'localhost' or the actual local host name should be used to 824 * make local connections. On some operating systems such as Macs its not 825 * possible to connect as the local host name so localhost is better. 826 */ 827 public void setUseLocalHost(boolean useLocalHost) { 828 this.useLocalHost = useLocalHost; 829 } 830 831 private void trace(String message) { 832 // TODO: 833 } 834 835 public SocketChannel getSocketChannel() { 836 return channel; 837 } 838 839 public ReadableByteChannel getReadChannel() { 840 initRateLimitingChannel(); 841 if(rateLimitingChannel!=null) { 842 return rateLimitingChannel; 843 } else { 844 return channel; 845 } 846 } 847 848 public WritableByteChannel getWriteChannel() { 849 initRateLimitingChannel(); 850 if(rateLimitingChannel!=null) { 851 return rateLimitingChannel; 852 } else { 853 return channel; 854 } 855 } 856 857 public int getMaxReadRate() { 858 return maxReadRate; 859 } 860 861 public void setMaxReadRate(int maxReadRate) { 862 this.maxReadRate = maxReadRate; 863 } 864 865 public int getMaxWriteRate() { 866 return maxWriteRate; 867 } 868 869 public void setMaxWriteRate(int maxWriteRate) { 870 this.maxWriteRate = maxWriteRate; 871 } 872 873 public int getTrafficClass() { 874 return trafficClass; 875 } 876 877 public void setTrafficClass(int trafficClass) { 878 this.trafficClass = trafficClass; 879 } 880 881 public int getReceiveBufferSize() { 882 return receiveBufferSize; 883 } 884 885 public void setReceiveBufferSize(int receiveBufferSize) { 886 this.receiveBufferSize = receiveBufferSize; 887 if( channel!=null ) { 888 try { 889 channel.socket().setReceiveBufferSize(receiveBufferSize); 890 } catch (SocketException ignore) { 891 } 892 } 893 } 894 895 public int getSendBufferSize() { 896 return sendBufferSize; 897 } 898 899 public void setSendBufferSize(int sendBufferSize) { 900 this.sendBufferSize = sendBufferSize; 901 if( channel!=null ) { 902 try { 903 channel.socket().setReceiveBufferSize(sendBufferSize); 904 } catch (SocketException ignore) { 905 } 906 } 907 } 908 909 public boolean isKeepAlive() { 910 return keepAlive; 911 } 912 913 public void setKeepAlive(boolean keepAlive) { 914 this.keepAlive = keepAlive; 915 } 916 917 public Executor getBlockingExecutor() { 918 return blockingExecutor; 919 } 920 921 public void setBlockingExecutor(Executor blockingExecutor) { 922 this.blockingExecutor = blockingExecutor; 923 } 924 925 public boolean isCloseOnCancel() { 926 return closeOnCancel; 927 } 928 929 public void setCloseOnCancel(boolean closeOnCancel) { 930 this.closeOnCancel = closeOnCancel; 931 } 932}