1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, re, socket, time, types, weakref
20
21 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
22 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
23 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
24 from select import select
25
26 log = logging.getLogger("proton")
29 """
30 A utility for simpler and more intuitive handling of delivery
31 events related to outgoing i.e. sent messages.
32 """
33 - def __init__(self, auto_settle=True, delegate=None):
34 self.auto_settle = auto_settle
35 self.delegate = delegate
36
42
56
58 """
59 Called when the sender link has credit and messages can
60 therefore be transferred.
61 """
62 if self.delegate != None:
63 dispatch(self.delegate, 'on_sendable', event)
64
66 """
67 Called when the remote peer accepts an outgoing message.
68 """
69 if self.delegate != None:
70 dispatch(self.delegate, 'on_accepted', event)
71
73 """
74 Called when the remote peer rejects an outgoing message.
75 """
76 if self.delegate != None:
77 dispatch(self.delegate, 'on_rejected', event)
78
80 """
81 Called when the remote peer releases an outgoing message. Note
82 that this may be in response to either the RELEASE or MODIFIED
83 state as defined by the AMQP specification.
84 """
85 if self.delegate != None:
86 dispatch(self.delegate, 'on_released', event)
87
89 """
90 Called when the remote peer has settled the outgoing
91 message. This is the point at which it shouod never be
92 retransmitted.
93 """
94 if self.delegate != None:
95 dispatch(self.delegate, 'on_settled', event)
96
102
103 -class Reject(ProtonException):
104 """
105 An exception that indicate a message should be rejected
106 """
107 pass
108
110 """
111 An exception that indicate a message should be rejected
112 """
113 pass
114
121
128
129 - def release(self, delivery, delivered=True):
130 """
131 Releases a received message, making it available at the source
132 for any (other) interested receiver. The ``delivered``
133 parameter indicates whether this should be considered a
134 delivery attempt (and the delivery count updated) or not.
135 """
136 if delivered:
137 self.settle(delivery, Delivery.MODIFIED)
138 else:
139 self.settle(delivery, Delivery.RELEASED)
140
141 - def settle(self, delivery, state=None):
145
147 """
148 A utility for simpler and more intuitive handling of delivery
149 events related to incoming i.e. received messages.
150 """
151
152 - def __init__(self, auto_accept=True, delegate=None):
153 self.delegate = delegate
154 self.auto_accept = auto_accept
155
182
184 """
185 Called when a message is received. The message itself can be
186 obtained as a property on the event. For the purpose of
187 refering to this message in further actions (e.g. if
188 explicitly accepting it, the ``delivery`` should be used, also
189 obtainable via a property on the event.
190 """
191 if self.delegate != None:
192 dispatch(self.delegate, 'on_message', event)
193
195 if self.delegate != None:
196 dispatch(self.delegate, 'on_settled', event)
197
199 if self.delegate != None:
200 dispatch(self.delegate, 'on_aborted', event)
201
203 """
204 A utility that exposes 'endpoint' events i.e. the open/close for
205 links, sessions and connections in a more intuitive manner. A
206 XXX_opened method will be called when both local and remote peers
207 have opened the link, session or connection. This can be used to
208 confirm a locally initiated action for example. A XXX_opening
209 method will be called when the remote peer has requested an open
210 that was not initiated locally. By default this will simply open
211 locally, which then triggers the XXX_opened call. The same applies
212 to close.
213 """
214
215 - def __init__(self, peer_close_is_error=False, delegate=None):
216 self.delegate = delegate
217 self.peer_close_is_error = peer_close_is_error
218
219 @classmethod
222
223 @classmethod
226
227 @classmethod
230
231 @classmethod
234
235 @classmethod
238
239 @classmethod
245
254
263
276
280
287
291
298
302
309
311 if self.delegate != None:
312 dispatch(self.delegate, 'on_connection_opened', event)
313
315 if self.delegate != None:
316 dispatch(self.delegate, 'on_session_opened', event)
317
319 if self.delegate != None:
320 dispatch(self.delegate, 'on_link_opened', event)
321
323 if self.delegate != None:
324 dispatch(self.delegate, 'on_connection_opening', event)
325
327 if self.delegate != None:
328 dispatch(self.delegate, 'on_session_opening', event)
329
331 if self.delegate != None:
332 dispatch(self.delegate, 'on_link_opening', event)
333
335 if self.delegate != None:
336 dispatch(self.delegate, 'on_connection_error', event)
337 else:
338 self.log_error(event.connection, "connection")
339
341 if self.delegate != None:
342 dispatch(self.delegate, 'on_session_error', event)
343 else:
344 self.log_error(event.session, "session")
345 event.connection.close()
346
348 if self.delegate != None:
349 dispatch(self.delegate, 'on_link_error', event)
350 else:
351 self.log_error(event.link, "link")
352 event.connection.close()
353
355 if self.delegate != None:
356 dispatch(self.delegate, 'on_connection_closed', event)
357
359 if self.delegate != None:
360 dispatch(self.delegate, 'on_session_closed', event)
361
363 if self.delegate != None:
364 dispatch(self.delegate, 'on_link_closed', event)
365
367 if self.delegate != None:
368 dispatch(self.delegate, 'on_connection_closing', event)
369 elif self.peer_close_is_error:
370 self.on_connection_error(event)
371
373 if self.delegate != None:
374 dispatch(self.delegate, 'on_session_closing', event)
375 elif self.peer_close_is_error:
376 self.on_session_error(event)
377
379 if self.delegate != None:
380 dispatch(self.delegate, 'on_link_closing', event)
381 elif self.peer_close_is_error:
382 self.on_link_error(event)
383
386
390
392 """
393 A general purpose handler that makes the proton-c events somewhat
394 simpler to deal with and/or avoids repetitive tasks for common use
395 cases.
396 """
397 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
405
421
427
434
441
443 """
444 Called when the event loop - the reactor - starts.
445 """
446 if hasattr(event.reactor, 'subclass'):
447 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
448 self.on_start(event)
449
451 """
452 Called when the event loop starts. (Just an alias for on_reactor_init)
453 """
454 pass
456 """
457 Called when the connection is closed.
458 """
459 pass
461 """
462 Called when the session is closed.
463 """
464 pass
466 """
467 Called when the link is closed.
468 """
469 pass
471 """
472 Called when the peer initiates the closing of the connection.
473 """
474 pass
476 """
477 Called when the peer initiates the closing of the session.
478 """
479 pass
481 """
482 Called when the peer initiates the closing of the link.
483 """
484 pass
486 """
487 Called when the socket is disconnected.
488 """
489 pass
490
492 """
493 Called when the sender link has credit and messages can
494 therefore be transferred.
495 """
496 pass
497
499 """
500 Called when the remote peer accepts an outgoing message.
501 """
502 pass
503
505 """
506 Called when the remote peer rejects an outgoing message.
507 """
508 pass
509
511 """
512 Called when the remote peer releases an outgoing message. Note
513 that this may be in response to either the RELEASE or MODIFIED
514 state as defined by the AMQP specification.
515 """
516 pass
517
519 """
520 Called when the remote peer has settled the outgoing
521 message. This is the point at which it shouod never be
522 retransmitted.
523 """
524 pass
526 """
527 Called when a message is received. The message itself can be
528 obtained as a property on the event. For the purpose of
529 refering to this message in further actions (e.g. if
530 explicitly accepting it, the ``delivery`` should be used, also
531 obtainable via a property on the event.
532 """
533 pass
534
536 """
537 The interface for transaction handlers, i.e. objects that want to
538 be notified of state changes related to a transaction.
539 """
542
545
548
551
554
556 """
557 An extension to the MessagingHandler for applications using
558 transactions.
559 """
560
561 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
563
564 - def accept(self, delivery, transaction=None):
569
570 from proton import WrappedHandler
571 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
574
576 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
577
579
581 WrappedHandler.__init__(self, pn_handshaker)
582
584
586 WrappedHandler.__init__(self, pn_iohandler)
587
589
591 self.selectables = []
592 self.delegate = IOHandler()
593
596
598 self.selectables.append(event.context)
599
602
604 sel = event.context
605 if sel.is_terminal:
606 self.selectables.remove(sel)
607 sel.release()
608
650