1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import errno
23 import logging
24 import socket
25 import time
26 import weakref
27
28 from ._condition import Condition
29 from ._delivery import Delivery
30 from ._endpoints import Endpoint
31 from ._events import Event, Handler, _dispatch
32 from ._exceptions import ProtonException
33 from ._io import IO
34 from ._message import Message
35 from ._selectable import Selectable
36 from ._transport import Transport
37 from ._url import Url
38
39 log = logging.getLogger("proton")
43 """
44 A utility for simpler and more intuitive handling of delivery
45 events related to outgoing i.e. sent messages.
46 """
47
48 - def __init__(self, auto_settle=True, delegate=None):
49 self.auto_settle = auto_settle
50 self.delegate = delegate
51
57
71
73 """
74 Called when the sender link has credit and messages can
75 therefore be transferred.
76 """
77 if self.delegate is not None:
78 _dispatch(self.delegate, 'on_sendable', event)
79
81 """
82 Called when the remote peer accepts an outgoing message.
83 """
84 if self.delegate is not None:
85 _dispatch(self.delegate, 'on_accepted', event)
86
88 """
89 Called when the remote peer rejects an outgoing message.
90 """
91 if self.delegate is not None:
92 _dispatch(self.delegate, 'on_rejected', event)
93
95 """
96 Called when the remote peer releases an outgoing message. Note
97 that this may be in response to either the RELEASE or MODIFIED
98 state as defined by the AMQP specification.
99 """
100 if self.delegate is not None:
101 _dispatch(self.delegate, 'on_released', event)
102
104 """
105 Called when the remote peer has settled the outgoing
106 message. This is the point at which it should never be
107 retransmitted.
108 """
109 if self.delegate is not None:
110 _dispatch(self.delegate, 'on_settled', event)
111
118
119
120 -class Reject(ProtonException):
121 """
122 An exception that indicate a message should be rejected
123 """
124 pass
125
126
127 -class Release(ProtonException):
128 """
129 An exception that indicate a message should be rejected
130 """
131 pass
132
136 """
137 Accepts a received message.
138
139 Note that this method cannot currently be used in combination
140 with transactions.
141 """
142 self.settle(delivery, Delivery.ACCEPTED)
143
150
151 - def release(self, delivery, delivered=True):
152 """
153 Releases a received message, making it available at the source
154 for any (other) interested receiver. The ``delivered``
155 parameter indicates whether this should be considered a
156 delivery attempt (and the delivery count updated) or not.
157 """
158 if delivered:
159 self.settle(delivery, Delivery.MODIFIED)
160 else:
161 self.settle(delivery, Delivery.RELEASED)
162
163 - def settle(self, delivery, state=None):
167
170 """
171 A utility for simpler and more intuitive handling of delivery
172 events related to incoming i.e. received messages.
173 """
174
175 - def __init__(self, auto_accept=True, delegate=None):
176 self.delegate = delegate
177 self.auto_accept = auto_accept
178
205
207 """
208 Called when a message is received. The message itself can be
209 obtained as a property on the event. For the purpose of
210 referring to this message in further actions (e.g. if
211 explicitly accepting it, the ``delivery`` should be used, also
212 obtainable via a property on the event.
213 """
214 if self.delegate is not None:
215 _dispatch(self.delegate, 'on_message', event)
216
218 if self.delegate is not None:
219 _dispatch(self.delegate, 'on_settled', event)
220
222 if self.delegate is not None:
223 _dispatch(self.delegate, 'on_aborted', event)
224
227 """
228 A utility that exposes 'endpoint' events i.e. the open/close for
229 links, sessions and connections in a more intuitive manner. A
230 XXX_opened method will be called when both local and remote peers
231 have opened the link, session or connection. This can be used to
232 confirm a locally initiated action for example. A XXX_opening
233 method will be called when the remote peer has requested an open
234 that was not initiated locally. By default this will simply open
235 locally, which then triggers the XXX_opened call. The same applies
236 to close.
237 """
238
239 - def __init__(self, peer_close_is_error=False, delegate=None):
240 self.delegate = delegate
241 self.peer_close_is_error = peer_close_is_error
242
243 @classmethod
246
247 @classmethod
250
251 @classmethod
254
255 @classmethod
258
259 @classmethod
262
263 @classmethod
269
278
287
300
304
311
315
322
326
333
335 if self.delegate is not None:
336 _dispatch(self.delegate, 'on_connection_opened', event)
337
339 if self.delegate is not None:
340 _dispatch(self.delegate, 'on_session_opened', event)
341
343 if self.delegate is not None:
344 _dispatch(self.delegate, 'on_link_opened', event)
345
347 if self.delegate is not None:
348 _dispatch(self.delegate, 'on_connection_opening', event)
349
351 if self.delegate is not None:
352 _dispatch(self.delegate, 'on_session_opening', event)
353
355 if self.delegate is not None:
356 _dispatch(self.delegate, 'on_link_opening', event)
357
359 if self.delegate is not None:
360 _dispatch(self.delegate, 'on_connection_error', event)
361 else:
362 self.log_error(event.connection, "connection")
363
365 if self.delegate is not None:
366 _dispatch(self.delegate, 'on_session_error', event)
367 else:
368 self.log_error(event.session, "session")
369 event.connection.close()
370
372 if self.delegate is not None:
373 _dispatch(self.delegate, 'on_link_error', event)
374 else:
375 self.log_error(event.link, "link")
376 event.connection.close()
377
379 if self.delegate is not None:
380 _dispatch(self.delegate, 'on_connection_closed', event)
381
383 if self.delegate is not None:
384 _dispatch(self.delegate, 'on_session_closed', event)
385
387 if self.delegate is not None:
388 _dispatch(self.delegate, 'on_link_closed', event)
389
391 if self.delegate is not None:
392 _dispatch(self.delegate, 'on_connection_closing', event)
393 elif self.peer_close_is_error:
394 self.on_connection_error(event)
395
397 if self.delegate is not None:
398 _dispatch(self.delegate, 'on_session_closing', event)
399 elif self.peer_close_is_error:
400 self.on_session_error(event)
401
403 if self.delegate is not None:
404 _dispatch(self.delegate, 'on_link_closing', event)
405 elif self.peer_close_is_error:
406 self.on_link_error(event)
407
410
414
417 """
418 A general purpose handler that makes the proton-c events somewhat
419 simpler to deal with and/or avoids repetitive tasks for common use
420 cases.
421 """
422
423 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
431
449
455
462
469
471 """
472 Called when the event loop - the reactor - starts.
473 """
474 if hasattr(event.reactor, 'subclass'):
475 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
476 self.on_start(event)
477
479 """
480 Called when the event loop starts. (Just an alias for on_reactor_init)
481 """
482 pass
483
485 """
486 Called when the connection is closed.
487 """
488 pass
489
491 """
492 Called when the session is closed.
493 """
494 pass
495
497 """
498 Called when the link is closed.
499 """
500 pass
501
503 """
504 Called when the peer initiates the closing of the connection.
505 """
506 pass
507
509 """
510 Called when the peer initiates the closing of the session.
511 """
512 pass
513
515 """
516 Called when the peer initiates the closing of the link.
517 """
518 pass
519
521 """
522 Called when the socket is disconnected.
523 """
524 pass
525
527 """
528 Called when the sender link has credit and messages can
529 therefore be transferred.
530 """
531 pass
532
534 """
535 Called when the remote peer accepts an outgoing message.
536 """
537 pass
538
540 """
541 Called when the remote peer rejects an outgoing message.
542 """
543 pass
544
546 """
547 Called when the remote peer releases an outgoing message. Note
548 that this may be in response to either the RELEASE or MODIFIED
549 state as defined by the AMQP specification.
550 """
551 pass
552
554 """
555 Called when the remote peer has settled the outgoing
556 message. This is the point at which it should never be
557 retransmitted.
558 """
559 pass
560
562 """
563 Called when a message is received. The message itself can be
564 obtained as a property on the event. For the purpose of
565 referring to this message in further actions (e.g. if
566 explicitly accepting it, the ``delivery`` should be used, also
567 obtainable via a property on the event.
568 """
569 pass
570
573 """
574 The interface for transaction handlers, i.e. objects that want to
575 be notified of state changes related to a transaction.
576 """
577
580
583
586
589
592
595 """
596 An extension to the MessagingHandler for applications using
597 transactions.
598 """
599
600 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
602
603 - def accept(self, delivery, transaction=None):
608
612 self._window = window
613 self._drained = 0
614
616 self._flow(event.link)
617
619 self._flow(event.link)
620
622 self._flow(event.link)
623
625 self._flow(event.link)
626
633
636
637 @staticmethod
642
643 @staticmethod
648
649 @staticmethod
656
657 @staticmethod
662
663 @staticmethod
668
669 @staticmethod
674
675
676
677 CFlowController = FlowController
678 CHandshaker = Handshaker
741
745
748
753
757
763
783
811
836
845
852
863
906
907 @staticmethod
908 - def update(transport, selectable, now):
923
930
939
942 - def __init__(self, sock, reactor, addrs, transport, iohandler):
947
950
952 e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
953 t = self._transport
954 if e == 0:
955 log.debug("Connection succeeded")
956 s = self._reactor.selectable(delegate=self._delegate)
957 s._transport = t
958 t._selectable = s
959 self._iohandler.update(t, s, t._reactor.now)
960
961
962 self._delegate = None
963 self.terminate()
964 self.update()
965 return
966 elif e == errno.ECONNREFUSED:
967 if len(self._addrs) > 0:
968 log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
969 sock = IO.connect(self._addrs[0])
970 self._addrs = self._addrs[1:]
971 self._delegate.close()
972 self._delegate = sock
973 return
974 else:
975 log.debug("Connection refused, but tried all transport addresses")
976 t.condition = Condition("proton.pythonio", "Connection refused to all addresses")
977 else:
978 log.error("Couldn't connect: %s", e)
979 t.condition = Condition("proton.pythonio", "Connection error: %s" % e)
980
981 t.close_tail()
982 t.close_head()
983 self.terminate()
984 self.update()
985