1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32
33 from cproton import *
34 import weakref
35 try:
36 import uuid
37 except ImportError:
38 """
39 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
40 """
41 import struct
44 - def __init__(self, hex=None, bytes=None):
45 if [hex, bytes].count(None) != 1:
46 raise TypeError("need one of hex or bytes")
47 if bytes is not None:
48 self.bytes = bytes
49 elif hex is not None:
50 fields=hex.split("-")
51 fields[4:5] = [fields[4][:4], fields[4][4:]]
52 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
53
55 if isinstance(other, uuid.UUID):
56 return cmp(self.bytes, other.bytes)
57 else:
58 return -1
59
61 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
62
64 return "UUID(%r)" % str(self)
65
67 return self.bytes.__hash__()
68
69 import os, random, socket, time
70 rand = random.Random()
71 rand.seed((os.getpid(), time.time(), socket.gethostname()))
73 bytes = [rand.randint(0, 255) for i in xrange(16)]
74
75
76 bytes[7] &= 0x0F
77 bytes[7] |= 0x40
78
79
80 bytes[8] &= 0x3F
81 bytes[8] |= 0x80
82 return "".join(map(chr, bytes))
83
85 return uuid.UUID(bytes=random_uuid())
86
87 try:
88 bytes()
89 except NameError:
90 bytes = str
91
92 API_LANGUAGE = "C"
93 IMPLEMENTATION_LANGUAGE = "C"
102
104 """
105 The root of the proton exception hierarchy. All proton exception
106 classes derive from this exception.
107 """
108 pass
109
111 """
112 A timeout exception indicates that a blocking operation has timed
113 out.
114 """
115 pass
116
118 """
119 An interrupt exception indicaes that a blocking operation was interrupted.
120 """
121 pass
122
124 """
125 The root of the messenger exception hierarchy. All exceptions
126 generated by the messenger class derive from this exception.
127 """
128 pass
129
131 """
132 The MessageException class is the root of the message exception
133 hierarhcy. All exceptions generated by the Message class derive from
134 this exception.
135 """
136 pass
137
138 EXCEPTIONS = {
139 PN_TIMEOUT: Timeout,
140 PN_INTR: Interrupt
141 }
142
143 PENDING = Constant("PENDING")
144 ACCEPTED = Constant("ACCEPTED")
145 REJECTED = Constant("REJECTED")
146 RELEASED = Constant("RELEASED")
147 ABORTED = Constant("ABORTED")
148 SETTLED = Constant("SETTLED")
149
150 STATUSES = {
151 PN_STATUS_ABORTED: ABORTED,
152 PN_STATUS_ACCEPTED: ACCEPTED,
153 PN_STATUS_REJECTED: REJECTED,
154 PN_STATUS_RELEASED: RELEASED,
155 PN_STATUS_PENDING: PENDING,
156 PN_STATUS_SETTLED: SETTLED,
157 PN_STATUS_UNKNOWN: None
158 }
159
160 AUTOMATIC = Constant("AUTOMATIC")
161 MANUAL = Constant("MANUAL")
164 """
165 The L{Messenger} class defines a high level interface for sending
166 and receiving L{Messages<Message>}. Every L{Messenger} contains a
167 single logical queue of incoming messages and a single logical queue
168 of outgoing messages. These messages in these queues may be destined
169 for, or originate from, a variety of addresses.
170
171 The messenger interface is single-threaded. All methods
172 except one (L{interrupt}) are intended to be used from within
173 the messenger thread.
174
175
176 Address Syntax
177 ==============
178
179 An address has the following form::
180
181 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
182
183 Where domain can be one of::
184
185 host | host:port | ip | ip:port | name
186
187 The following are valid examples of addresses:
188
189 - example.org
190 - example.org:1234
191 - amqp://example.org
192 - amqps://example.org
193 - example.org/incoming
194 - amqps://example.org/outgoing
195 - amqps://fred:trustno1@example.org
196 - 127.0.0.1:1234
197 - amqps://127.0.0.1:1234
198
199 Sending & Receiving Messages
200 ============================
201
202 The L{Messenger} class works in conjuction with the L{Message} class. The
203 L{Message} class is a mutable holder of message content.
204
205 The L{put} method copies its L{Message} to the outgoing queue, and may
206 send queued messages if it can do so without blocking. The L{send}
207 method blocks until it has sent the requested number of messages,
208 or until a timeout interrupts the attempt.
209
210
211 >>> message = Message()
212 >>> for i in range(3):
213 ... message.address = "amqp://host/queue"
214 ... message.subject = "Hello World %i" % i
215 ... messenger.put(message)
216 >>> messenger.send()
217
218 Similarly, the L{recv} method receives messages into the incoming
219 queue, and may block as it attempts to receive the requested number
220 of messages, or until timeout is reached. It may receive fewer
221 than the requested number. The L{get} method pops the
222 eldest L{Message} off the incoming queue and copies it into the L{Message}
223 object that you supply. It will not block.
224
225
226 >>> message = Message()
227 >>> messenger.recv(10):
228 >>> while messenger.incoming > 0:
229 ... messenger.get(message)
230 ... print message.subject
231 Hello World 0
232 Hello World 1
233 Hello World 2
234
235 The blocking flag allows you to turn off blocking behavior entirely,
236 in which case L{send} and L{recv} will do whatever they can without
237 blocking, and then return. You can then look at the number
238 of incoming and outgoing messages to see how much outstanding work
239 still remains.
240 """
241
243 """
244 Construct a new L{Messenger} with the given name. The name has
245 global scope. If a NULL name is supplied, a UUID based name will
246 be chosen.
247
248 @type name: string
249 @param name: the name of the messenger or None
250
251 """
252 self._mng = pn_messenger(name)
253 self._selectables = {}
254
256 """
257 Destroy the L{Messenger}. This will close all connections that
258 are managed by the L{Messenger}. Call the L{stop} method before
259 destroying the L{Messenger}.
260 """
261 if hasattr(self, "_mng"):
262 pn_messenger_free(self._mng)
263 del self._mng
264
266 if err < 0:
267 if (err == PN_INPROGRESS):
268 return
269 exc = EXCEPTIONS.get(err, MessengerException)
270 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
271 else:
272 return err
273
274 @property
276 """
277 The name of the L{Messenger}.
278 """
279 return pn_messenger_name(self._mng)
280
282 return pn_messenger_get_certificate(self._mng)
283
285 self._check(pn_messenger_set_certificate(self._mng, value))
286
287 certificate = property(_get_certificate, _set_certificate,
288 doc="""
289 Path to a certificate file for the L{Messenger}. This certificate is
290 used when the L{Messenger} accepts or establishes SSL/TLS connections.
291 This property must be specified for the L{Messenger} to accept
292 incoming SSL/TLS connections and to establish client authenticated
293 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
294 connections do not require this property.
295 """)
296
298 return pn_messenger_get_private_key(self._mng)
299
301 self._check(pn_messenger_set_private_key(self._mng, value))
302
303 private_key = property(_get_private_key, _set_private_key,
304 doc="""
305 Path to a private key file for the L{Messenger's<Messenger>}
306 certificate. This property must be specified for the L{Messenger} to
307 accept incoming SSL/TLS connections and to establish client
308 authenticated outgoing SSL/TLS connection. Non client authenticated
309 SSL/TLS connections do not require this property.
310 """)
311
313 return pn_messenger_get_password(self._mng)
314
316 self._check(pn_messenger_set_password(self._mng, value))
317
318 password = property(_get_password, _set_password,
319 doc="""
320 This property contains the password for the L{Messenger.private_key}
321 file, or None if the file is not encrypted.
322 """)
323
325 return pn_messenger_get_trusted_certificates(self._mng)
326
328 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
329
330 trusted_certificates = property(_get_trusted_certificates,
331 _set_trusted_certificates,
332 doc="""
333 A path to a database of trusted certificates for use in verifying the
334 peer on an SSL/TLS connection. If this property is None, then the peer
335 will not be verified.
336 """)
337
339 t = pn_messenger_get_timeout(self._mng)
340 if t == -1:
341 return None
342 else:
343 return float(t)/1000
344
346 if value is None:
347 t = -1
348 else:
349 t = long(1000*value)
350 self._check(pn_messenger_set_timeout(self._mng, t))
351
352 timeout = property(_get_timeout, _set_timeout,
353 doc="""
354 The timeout property contains the default timeout for blocking
355 operations performed by the L{Messenger}.
356 """)
357
359 return pn_messenger_is_blocking(self._mng)
360
362 self._check(pn_messenger_set_blocking(self._mng, b))
363
364 blocking = property(_is_blocking, _set_blocking,
365 doc="""
366 Enable or disable blocking behavior during L{Message} sending
367 and receiving. This affects every blocking call, with the
368 exception of L{work}. Currently, the affected calls are
369 L{send}, L{recv}, and L{stop}.
370 """)
371
373 return pn_messenger_is_passive(self._mng)
374
376 self._check(pn_messenger_set_passive(self._mng, b))
377
378 passive = property(_is_passive, _set_passive,
379 doc="""
380 When passive is set to true, Messenger will not attempt to perform I/O
381 internally. In this mode it is necessary to use the selectables API to
382 drive any I/O needed to perform requested actions. In this mode
383 Messenger will never block.
384 """)
385
387 return pn_messenger_get_incoming_window(self._mng)
388
390 self._check(pn_messenger_set_incoming_window(self._mng, window))
391
392 incoming_window = property(_get_incoming_window, _set_incoming_window,
393 doc="""
394 The incoming tracking window for the messenger. The messenger will
395 track the remote status of this many incoming deliveries after they
396 have been accepted or rejected. Defaults to zero.
397
398 L{Messages<Message>} enter this window only when you take them into your application
399 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
400 without explicitly accepting or rejecting the oldest message, then the
401 message that passes beyond the edge of the incoming window will be assigned
402 the default disposition of its link.
403 """)
404
406 return pn_messenger_get_outgoing_window(self._mng)
407
409 self._check(pn_messenger_set_outgoing_window(self._mng, window))
410
411 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
412 doc="""
413 The outgoing tracking window for the messenger. The messenger will
414 track the remote status of this many outgoing deliveries after calling
415 send. Defaults to zero.
416
417 A L{Message} enters this window when you call the put() method with the
418 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
419 times, status information will no longer be available for the
420 first message.
421 """)
422
424 """
425 Currently a no-op placeholder.
426 For future compatibility, do not L{send} or L{recv} messages
427 before starting the L{Messenger}.
428 """
429 self._check(pn_messenger_start(self._mng))
430
432 """
433 Transitions the L{Messenger} to an inactive state. An inactive
434 L{Messenger} will not send or receive messages from its internal
435 queues. A L{Messenger} should be stopped before being discarded to
436 ensure a clean shutdown handshake occurs on any internally managed
437 connections.
438 """
439 self._check(pn_messenger_stop(self._mng))
440
441 @property
443 """
444 Returns true iff a L{Messenger} is in the stopped state.
445 This function does not block.
446 """
447 return pn_messenger_stopped(self._mng)
448
450 """
451 Subscribes the L{Messenger} to messages originating from the
452 specified source. The source is an address as specified in the
453 L{Messenger} introduction with the following addition. If the
454 domain portion of the address begins with the '~' character, the
455 L{Messenger} will interpret the domain as host/port, bind to it,
456 and listen for incoming messages. For example "~0.0.0.0",
457 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
458 local interface and listen for incoming messages with the last
459 variant only permitting incoming SSL connections.
460
461 @type source: string
462 @param source: the source of messages to subscribe to
463 """
464 sub_impl = pn_messenger_subscribe(self._mng, source)
465 if not sub_impl:
466 self._check(pn_error_code(pn_messenger_error(self._mng)))
467 return Subscription(sub_impl)
468
469 - def put(self, message):
470 """
471 Places the content contained in the message onto the outgoing
472 queue of the L{Messenger}. This method will never block, however
473 it will send any unblocked L{Messages<Message>} in the outgoing
474 queue immediately and leave any blocked L{Messages<Message>}
475 remaining in the outgoing queue. The L{send} call may be used to
476 block until the outgoing queue is empty. The L{outgoing} property
477 may be used to check the depth of the outgoing queue.
478
479 When the content in a given L{Message} object is copied to the outgoing
480 message queue, you may then modify or discard the L{Message} object
481 without having any impact on the content in the outgoing queue.
482
483 This method returns an outgoing tracker for the L{Message}. The tracker
484 can be used to determine the delivery status of the L{Message}.
485
486 @type message: Message
487 @param message: the message to place in the outgoing queue
488 @return: a tracker
489 """
490 message._pre_encode()
491 self._check(pn_messenger_put(self._mng, message._msg))
492 return pn_messenger_outgoing_tracker(self._mng)
493
495 """
496 Gets the last known remote state of the delivery associated with
497 the given tracker.
498
499 @type tracker: tracker
500 @param tracker: the tracker whose status is to be retrieved
501
502 @return: one of None, PENDING, REJECTED, or ACCEPTED
503 """
504 disp = pn_messenger_status(self._mng, tracker);
505 return STATUSES.get(disp, disp)
506
508 """
509 Checks if the delivery associated with the given tracker is still
510 waiting to be sent.
511
512 @type tracker: tracker
513 @param tracker: the tracker whose status is to be retrieved
514
515 @return: true if delivery is still buffered
516 """
517 return pn_messenger_buffered(self._mng, tracker);
518
519 - def settle(self, tracker=None):
520 """
521 Frees a L{Messenger} from tracking the status associated with a given
522 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
523 to the most recent will be settled.
524 """
525 if tracker is None:
526 tracker = pn_messenger_outgoing_tracker(self._mng)
527 flags = PN_CUMULATIVE
528 else:
529 flags = 0
530 self._check(pn_messenger_settle(self._mng, tracker, flags))
531
532 - def send(self, n=-1):
533 """
534 This call will block until the indicated number of L{messages<Message>}
535 have been sent, or until the operation times out. If n is -1 this call will
536 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
537 this call will send whatever it can without blocking.
538 """
539 self._check(pn_messenger_send(self._mng, n))
540
541 - def recv(self, n=None):
542 """
543 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
544 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
545 can buffer internally. If the L{Messenger} is in blocking mode, this
546 call will block until at least one L{Message} is available in the
547 incoming queue.
548 """
549 if n is None:
550 n = -1
551 self._check(pn_messenger_recv(self._mng, n))
552
553 - def work(self, timeout=None):
554 """
555 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
556 This will block for the indicated timeout.
557 This method may also do I/O work other than sending and receiving
558 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
559 has been called.
560 """
561 if timeout is None:
562 t = -1
563 else:
564 t = long(1000*timeout)
565 err = pn_messenger_work(self._mng, t)
566 if (err == PN_TIMEOUT):
567 return False
568 else:
569 self._check(err)
570 return True
571
572 @property
574 return pn_messenger_receiving(self._mng)
575
577 """
578 The L{Messenger} interface is single-threaded.
579 This is the only L{Messenger} function intended to be called
580 from outside of the L{Messenger} thread.
581 Call this from a non-messenger thread to interrupt
582 a L{Messenger} that is blocking.
583 This will cause any in-progress blocking call to throw
584 the L{Interrupt} exception. If there is no currently blocking
585 call, then the next blocking call will be affected, even if it
586 is within the same thread that interrupt was called from.
587 """
588 self._check(pn_messenger_interrupt(self._mng))
589
590 - def get(self, message=None):
591 """
592 Moves the message from the head of the incoming message queue into
593 the supplied message object. Any content in the message will be
594 overwritten.
595
596 A tracker for the incoming L{Message} is returned. The tracker can
597 later be used to communicate your acceptance or rejection of the
598 L{Message}.
599
600 If None is passed in for the L{Message} object, the L{Message}
601 popped from the head of the queue is discarded.
602
603 @type message: Message
604 @param message: the destination message object
605 @return: a tracker
606 """
607 if message is None:
608 impl = None
609 else:
610 impl = message._msg
611 self._check(pn_messenger_get(self._mng, impl))
612 if message is not None:
613 message._post_decode()
614 return pn_messenger_incoming_tracker(self._mng)
615
616 - def accept(self, tracker=None):
617 """
618 Signal the sender that you have acted on the L{Message}
619 pointed to by the tracker. If no tracker is supplied,
620 then all messages that have been returned by the L{get}
621 method are accepted, except those that have already been
622 auto-settled by passing beyond your incoming window size.
623
624 @type tracker: tracker
625 @param tracker: a tracker as returned by get
626 """
627 if tracker is None:
628 tracker = pn_messenger_incoming_tracker(self._mng)
629 flags = PN_CUMULATIVE
630 else:
631 flags = 0
632 self._check(pn_messenger_accept(self._mng, tracker, flags))
633
634 - def reject(self, tracker=None):
635 """
636 Rejects the L{Message} indicated by the tracker. If no tracker
637 is supplied, all messages that have been returned by the L{get}
638 method are rejected, except those that have already been auto-settled
639 by passing beyond your outgoing window size.
640
641 @type tracker: tracker
642 @param tracker: a tracker as returned by get
643 """
644 if tracker is None:
645 tracker = pn_messenger_incoming_tracker(self._mng)
646 flags = PN_CUMULATIVE
647 else:
648 flags = 0
649 self._check(pn_messenger_reject(self._mng, tracker, flags))
650
651 @property
653 """
654 The outgoing queue depth.
655 """
656 return pn_messenger_outgoing(self._mng)
657
658 @property
660 """
661 The incoming queue depth.
662 """
663 return pn_messenger_incoming(self._mng)
664
665 - def route(self, pattern, address):
666 """
667 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
668
669 The route procedure may be used to influence how a L{Messenger} will
670 internally treat a given address or class of addresses. Every call
671 to the route procedure will result in L{Messenger} appending a routing
672 rule to its internal routing table.
673
674 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
675 will match the address of this message against the set of routing
676 rules in order. The first rule to match will be triggered, and
677 instead of routing based on the address presented in the message,
678 the L{Messenger} will route based on the address supplied in the rule.
679
680 The pattern matching syntax supports two types of matches, a '%'
681 will match any character except a '/', and a '*' will match any
682 character including a '/'.
683
684 A routing address is specified as a normal AMQP address, however it
685 may additionally use substitution variables from the pattern match
686 that triggered the rule.
687
688 Any message sent to "foo" will be routed to "amqp://foo.com":
689
690 >>> messenger.route("foo", "amqp://foo.com");
691
692 Any message sent to "foobar" will be routed to
693 "amqp://foo.com/bar":
694
695 >>> messenger.route("foobar", "amqp://foo.com/bar");
696
697 Any message sent to bar/<path> will be routed to the corresponding
698 path within the amqp://bar.com domain:
699
700 >>> messenger.route("bar/*", "amqp://bar.com/$1");
701
702 Route all L{messages<Message>} over TLS:
703
704 >>> messenger.route("amqp:*", "amqps:$1")
705
706 Supply credentials for foo.com:
707
708 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
709
710 Supply credentials for all domains:
711
712 >>> messenger.route("amqp://*", "amqp://user:password@$1");
713
714 Route all addresses through a single proxy while preserving the
715 original destination:
716
717 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
718
719 Route any address through a single broker:
720
721 >>> messenger.route("*", "amqp://user:password@broker/$1");
722 """
723 self._check(pn_messenger_route(self._mng, pattern, address))
724
725 - def rewrite(self, pattern, address):
726 """
727 Similar to route(), except that the destination of
728 the L{Message} is determined before the message address is rewritten.
729
730 The outgoing address is only rewritten after routing has been
731 finalized. If a message has an outgoing address of
732 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
733 outgoing address to "foo", it will still arrive at the peer that
734 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
735 the receiver will see its outgoing address as "foo".
736
737 The default rewrite rule removes username and password from addresses
738 before they are transmitted.
739 """
740 self._check(pn_messenger_rewrite(self._mng, pattern, address))
741
743 impl = pn_messenger_selectable(self._mng)
744 if impl:
745 fd = pn_selectable_fd(impl)
746 sel = self._selectables.get(fd, None)
747 if sel is None:
748 sel = Selectable(self, impl)
749 self._selectables[fd] = sel
750 return sel
751 else:
752 return None
753
754 @property
756 tstamp = pn_messenger_deadline(self._mng)
757 if tstamp:
758 return float(tstamp)/1000
759 else:
760 return None
761
763 """
764 The L{Message} class is a mutable holder of message content.
765
766 @ivar instructions: delivery instructions for the message
767 @type instructions: dict
768 @ivar annotations: infrastructure defined message annotations
769 @type annotations: dict
770 @ivar properties: application defined message properties
771 @type properties: dict
772 @ivar body: message body
773 @type body: bytes | unicode | dict | list | int | long | float | UUID
774 """
775
776 DATA = PN_DATA
777 TEXT = PN_TEXT
778 AMQP = PN_AMQP
779 JSON = PN_JSON
780
781 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
782
784 self._msg = pn_message()
785 self._id = Data(pn_message_id(self._msg))
786 self._correlation_id = Data(pn_message_correlation_id(self._msg))
787 self.instructions = None
788 self.annotations = None
789 self.properties = None
790 self.body = None
791
793 if hasattr(self, "_msg"):
794 pn_message_free(self._msg)
795 del self._msg
796
798 if err < 0:
799 exc = EXCEPTIONS.get(err, MessageException)
800 raise exc("[%s]: %s" % (err, pn_message_error(self._msg)))
801 else:
802 return err
803
822
823 - def _post_decode(self):
824 inst = Data(pn_message_instructions(self._msg))
825 ann = Data(pn_message_annotations(self._msg))
826 props = Data(pn_message_properties(self._msg))
827 body = Data(pn_message_body(self._msg))
828
829 if inst.next():
830 self.instructions = inst.get_object()
831 else:
832 self.instructions = None
833 if ann.next():
834 self.annotations = ann.get_object()
835 else:
836 self.annotations = None
837 if props.next():
838 self.properties = props.get_object()
839 else:
840 self.properties = None
841 if body.next():
842 self.body = body.get_object()
843 else:
844 self.body = None
845
847 """
848 Clears the contents of the L{Message}. All fields will be reset to
849 their default values.
850 """
851 pn_message_clear(self._msg)
852 self.instructions = None
853 self.annotations = None
854 self.properties = None
855 self.body = None
856
858 return pn_message_is_inferred(self._msg)
859
861 self._check(pn_message_set_inferred(self._msg, bool(value)))
862
863 inferred = property(_is_inferred, _set_inferred)
864
866 return pn_message_is_durable(self._msg)
867
869 self._check(pn_message_set_durable(self._msg, bool(value)))
870
871 durable = property(_is_durable, _set_durable,
872 doc="""
873 The durable property indicates that the message should be held durably
874 by any intermediaries taking responsibility for the message.
875 """)
876
878 return pn_message_get_priority(self._msg)
879
881 self._check(pn_message_set_priority(self._msg, value))
882
883 priority = property(_get_priority, _set_priority,
884 doc="""
885 The priority of the message.
886 """)
887
889 return pn_message_get_ttl(self._msg)
890
892 self._check(pn_message_set_ttl(self._msg, value))
893
894 ttl = property(_get_ttl, _set_ttl,
895 doc="""
896 The time to live of the message measured in milliseconds. Expired
897 messages may be dropped.
898 """)
899
901 return pn_message_is_first_acquirer(self._msg)
902
904 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
905
906 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
907 doc="""
908 True iff the recipient is the first to acquire the message.
909 """)
910
912 return pn_message_get_delivery_count(self._msg)
913
915 self._check(pn_message_set_delivery_count(self._msg, value))
916
917 delivery_count = property(_get_delivery_count, _set_delivery_count,
918 doc="""
919 The number of delivery attempts made for this message.
920 """)
921
922
930 id = property(_get_id, _set_id,
931 doc="""
932 The id of the message.
933 """)
934
936 return pn_message_get_user_id(self._msg)
937
939 self._check(pn_message_set_user_id(self._msg, value))
940
941 user_id = property(_get_user_id, _set_user_id,
942 doc="""
943 The user id of the message creator.
944 """)
945
947 return pn_message_get_address(self._msg)
948
950 self._check(pn_message_set_address(self._msg, value))
951
952 address = property(_get_address, _set_address,
953 doc="""
954 The address of the message.
955 """)
956
958 return pn_message_get_subject(self._msg)
959
961 self._check(pn_message_set_subject(self._msg, value))
962
963 subject = property(_get_subject, _set_subject,
964 doc="""
965 The subject of the message.
966 """)
967
969 return pn_message_get_reply_to(self._msg)
970
972 self._check(pn_message_set_reply_to(self._msg, value))
973
974 reply_to = property(_get_reply_to, _set_reply_to,
975 doc="""
976 The reply-to address for the message.
977 """)
978
982 if type(value) in (int, long):
983 value = ulong(value)
984 self._correlation_id.rewind()
985 self._correlation_id.put_object(value)
986
987 correlation_id = property(_get_correlation_id, _set_correlation_id,
988 doc="""
989 The correlation-id for the message.
990 """)
991
993 return pn_message_get_content_type(self._msg)
994
995 - def _set_content_type(self, value):
996 self._check(pn_message_set_content_type(self._msg, value))
997
998 content_type = property(_get_content_type, _set_content_type,
999 doc="""
1000 The content-type of the message.
1001 """)
1002
1004 return pn_message_get_content_encoding(self._msg)
1005
1006 - def _set_content_encoding(self, value):
1007 self._check(pn_message_set_content_encoding(self._msg, value))
1008
1009 content_encoding = property(_get_content_encoding, _set_content_encoding,
1010 doc="""
1011 The content-encoding of the message.
1012 """)
1013
1015 return pn_message_get_expiry_time(self._msg)
1016
1018 self._check(pn_message_set_expiry_time(self._msg, value))
1019
1020 expiry_time = property(_get_expiry_time, _set_expiry_time,
1021 doc="""
1022 The expiry time of the message.
1023 """)
1024
1026 return pn_message_get_creation_time(self._msg)
1027
1029 self._check(pn_message_set_creation_time(self._msg, value))
1030
1031 creation_time = property(_get_creation_time, _set_creation_time,
1032 doc="""
1033 The creation time of the message.
1034 """)
1035
1037 return pn_message_get_group_id(self._msg)
1038
1040 self._check(pn_message_set_group_id(self._msg, value))
1041
1042 group_id = property(_get_group_id, _set_group_id,
1043 doc="""
1044 The group id of the message.
1045 """)
1046
1048 return pn_message_get_group_sequence(self._msg)
1049
1051 self._check(pn_message_set_group_sequence(self._msg, value))
1052
1053 group_sequence = property(_get_group_sequence, _set_group_sequence,
1054 doc="""
1055 The sequence of the message within its group.
1056 """)
1057
1059 return pn_message_get_reply_to_group_id(self._msg)
1060
1062 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1063
1064 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1065 doc="""
1066 The group-id for any replies.
1067 """)
1068
1069
1072
1075
1076 format = property(_get_format, _set_format,
1077 doc="""
1078 The format of the message.
1079 """)
1080
1082 self._pre_encode()
1083 sz = 16
1084 while True:
1085 err, data = pn_message_encode(self._msg, sz)
1086 if err == PN_OVERFLOW:
1087 sz *= 2
1088 continue
1089 else:
1090 self._check(err)
1091 return data
1092
1094 self._check(pn_message_decode(self._msg, data, len(data)))
1095 self._post_decode()
1096
1097 - def load(self, data):
1098 self._check(pn_message_load(self._msg, data))
1099
1101 sz = 16
1102 while True:
1103 err, data = pn_message_save(self._msg, sz)
1104 if err == PN_OVERFLOW:
1105 sz *= 2
1106 continue
1107 else:
1108 self._check(err)
1109 return data
1110
1112 props = []
1113 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1114 "priority", "first_acquirer", "delivery_count", "id",
1115 "correlation_id", "user_id", "group_id", "group_sequence",
1116 "reply_to_group_id", "instructions", "annotations",
1117 "properties", "body"):
1118 value = getattr(self, attr)
1119 if value: props.append("%s=%r" % (attr, value))
1120 return "Message(%s)" % ", ".join(props)
1121
1123 tmp = pn_string(None)
1124 err = pn_inspect(self._msg, tmp)
1125 result = pn_string_get(tmp)
1126 pn_free(tmp)
1127 self._check(err)
1128 return result
1129
1131
1134
1135 @property
1137 return pn_subscription_address(self._impl)
1138
1140
1142 self.messenger = messenger
1143 self._impl = impl
1144
1146 if not self._impl: raise ValueError("selectable freed")
1147 return pn_selectable_fd(self._impl)
1148
1149 @property
1151 if not self._impl: raise ValueError("selectable freed")
1152 return pn_selectable_capacity(self._impl)
1153
1154 @property
1156 if not self._impl: raise ValueError("selectable freed")
1157 return pn_selectable_pending(self._impl)
1158
1159 @property
1161 if not self._impl: raise ValueError("selectable freed")
1162 tstamp = pn_selectable_deadline(self._impl)
1163 if tstamp:
1164 return float(tstamp)/1000
1165 else:
1166 return None
1167
1169 if not self._impl: raise ValueError("selectable freed")
1170 pn_selectable_readable(self._impl)
1171
1173 if not self._impl: raise ValueError("selectable freed")
1174 pn_selectable_writable(self._impl)
1175
1177 if not self._impl: raise ValueError("selectable freed")
1178 pn_selectable_expired(self._impl)
1179
1181 if not self._impl: raise ValueError("selectable freed")
1182 return pn_selectable_is_registered(self._impl)
1183
1185 if not self._impl: raise ValueError("selectable freed")
1186 pn_selectable_set_registered(self._impl, registered)
1187
1188 registered = property(_is_registered, _set_registered,
1189 doc="""
1190 The registered property may be get/set by an I/O polling system to
1191 indicate whether the fd has been registered or not.
1192 """)
1193
1194 @property
1196 if not self._impl: return True
1197 return pn_selectable_is_terminal(self._impl)
1198
1200 if self._impl:
1201 del self.messenger._selectables[self.fileno()]
1202 pn_selectable_free(self._impl)
1203 self._impl = None
1204
1207
1209 """
1210 The DataException class is the root of the Data exception hierarchy.
1211 All exceptions raised by the Data class extend this exception.
1212 """
1213 pass
1214
1216
1219
1221 return "UnmappedType(%s)" % self.msg
1222
1224
1226 return "ulong(%s)" % long.__repr__(self)
1227
1229
1231 return "timestamp(%s)" % long.__repr__(self)
1232
1234
1236 return "symbol(%s)" % unicode.__repr__(self)
1237
1238 -class char(unicode):
1239
1241 return "char(%s)" % unicode.__repr__(self)
1242
1244
1245 - def __init__(self, descriptor, value):
1246 self.descriptor = descriptor
1247 self.value = value
1248
1250 return "Described(%r, %r)" % (self.descriptor, self.value)
1251
1253 if isinstance(o, Described):
1254 return self.descriptor == o.descriptor and self.value == o.value
1255 else:
1256 return False
1257
1258 UNDESCRIBED = Constant("UNDESCRIBED")
1259
1260 -class Array(object):
1261
1262 - def __init__(self, descriptor, type, *elements):
1263 self.descriptor = descriptor
1264 self.type = type
1265 self.elements = elements
1266
1268 if self.elements:
1269 els = ", %s" % (", ".join(map(repr, self.elements)))
1270 else:
1271 els = ""
1272 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1273
1275 if isinstance(o, Array):
1276 return self.descriptor == o.descriptor and \
1277 self.type == o.type and self.elements == o.elements
1278 else:
1279 return False
1280
1282 """
1283 The L{Data} class provides an interface for decoding, extracting,
1284 creating, and encoding arbitrary AMQP data. A L{Data} object
1285 contains a tree of AMQP values. Leaf nodes in this tree correspond
1286 to scalars in the AMQP type system such as L{ints<INT>} or
1287 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1288 compound values in the AMQP type system such as L{lists<LIST>},
1289 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1290 The root node of the tree is the L{Data} object itself and can have
1291 an arbitrary number of children.
1292
1293 A L{Data} object maintains the notion of the current sibling node
1294 and a current parent node. Siblings are ordered within their parent.
1295 Values are accessed and/or added by using the L{next}, L{prev},
1296 L{enter}, and L{exit} methods to navigate to the desired location in
1297 the tree and using the supplied variety of put_*/get_* methods to
1298 access or add a value of the desired type.
1299
1300 The put_* methods will always add a value I{after} the current node
1301 in the tree. If the current node has a next sibling the put_* method
1302 will overwrite the value on this node. If there is no current node
1303 or the current node has no next sibling then one will be added. The
1304 put_* methods always set the added/modified node to the current
1305 node. The get_* methods read the value of the current node and do
1306 not change which node is current.
1307
1308 The following types of scalar values are supported:
1309
1310 - L{NULL}
1311 - L{BOOL}
1312 - L{UBYTE}
1313 - L{USHORT}
1314 - L{SHORT}
1315 - L{UINT}
1316 - L{INT}
1317 - L{ULONG}
1318 - L{LONG}
1319 - L{FLOAT}
1320 - L{DOUBLE}
1321 - L{BINARY}
1322 - L{STRING}
1323 - L{SYMBOL}
1324
1325 The following types of compound values are supported:
1326
1327 - L{DESCRIBED}
1328 - L{ARRAY}
1329 - L{LIST}
1330 - L{MAP}
1331 """
1332
1333 NULL = PN_NULL; "A null value."
1334 BOOL = PN_BOOL; "A boolean value."
1335 UBYTE = PN_UBYTE; "An unsigned byte value."
1336 BYTE = PN_BYTE; "A signed byte value."
1337 USHORT = PN_USHORT; "An unsigned short value."
1338 SHORT = PN_SHORT; "A short value."
1339 UINT = PN_UINT; "An unsigned int value."
1340 INT = PN_INT; "A signed int value."
1341 CHAR = PN_CHAR; "A character value."
1342 ULONG = PN_ULONG; "An unsigned long value."
1343 LONG = PN_LONG; "A signed long value."
1344 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1345 FLOAT = PN_FLOAT; "A float value."
1346 DOUBLE = PN_DOUBLE; "A double value."
1347 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1348 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1349 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1350 UUID = PN_UUID; "A UUID value."
1351 BINARY = PN_BINARY; "A binary string."
1352 STRING = PN_STRING; "A unicode string."
1353 SYMBOL = PN_SYMBOL; "A symbolic string."
1354 DESCRIBED = PN_DESCRIBED; "A described value."
1355 ARRAY = PN_ARRAY; "An array value."
1356 LIST = PN_LIST; "A list value."
1357 MAP = PN_MAP; "A map value."
1358
1359 type_names = {
1360 NULL: "null",
1361 BOOL: "bool",
1362 BYTE: "byte",
1363 UBYTE: "ubyte",
1364 SHORT: "short",
1365 USHORT: "ushort",
1366 INT: "int",
1367 UINT: "uint",
1368 CHAR: "char",
1369 LONG: "long",
1370 ULONG: "ulong",
1371 TIMESTAMP: "timestamp",
1372 FLOAT: "float",
1373 DOUBLE: "double",
1374 DECIMAL32: "decimal32",
1375 DECIMAL64: "decimal64",
1376 DECIMAL128: "decimal128",
1377 UUID: "uuid",
1378 BINARY: "binary",
1379 STRING: "string",
1380 SYMBOL: "symbol",
1381 DESCRIBED: "described",
1382 ARRAY: "array",
1383 LIST: "list",
1384 MAP: "map"
1385 }
1386
1387 @classmethod
1389
1391 if type(capacity) in (int, long):
1392 self._data = pn_data(capacity)
1393 self._free = True
1394 else:
1395 self._data = capacity
1396 self._free = False
1397
1399 if self._free and hasattr(self, "_data"):
1400 pn_data_free(self._data)
1401 del self._data
1402
1404 if err < 0:
1405 exc = EXCEPTIONS.get(err, DataException)
1406 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1407 else:
1408 return err
1409
1411 """
1412 Clears the data object.
1413 """
1414 pn_data_clear(self._data)
1415
1417 """
1418 Clears current node and sets the parent to the root node. Clearing the
1419 current node sets it _before_ the first node, calling next() will advance to
1420 the first node.
1421 """
1422 pn_data_rewind(self._data)
1423
1425 """
1426 Advances the current node to its next sibling and returns its
1427 type. If there is no next sibling the current node remains
1428 unchanged and None is returned.
1429 """
1430 found = pn_data_next(self._data)
1431 if found:
1432 return self.type()
1433 else:
1434 return None
1435
1437 """
1438 Advances the current node to its previous sibling and returns its
1439 type. If there is no previous sibling the current node remains
1440 unchanged and None is returned.
1441 """
1442 found = pn_data_prev(self._data)
1443 if found:
1444 return self.type()
1445 else:
1446 return None
1447
1449 """
1450 Sets the parent node to the current node and clears the current node.
1451 Clearing the current node sets it _before_ the first child,
1452 call next() advances to the first child.
1453 """
1454 return pn_data_enter(self._data)
1455
1457 """
1458 Sets the current node to the parent node and the parent node to
1459 its own parent.
1460 """
1461 return pn_data_exit(self._data)
1462
1464 return pn_data_lookup(self._data, name)
1465
1467 pn_data_narrow(self._data)
1468
1470 pn_data_widen(self._data)
1471
1473 """
1474 Returns the type of the current node.
1475 """
1476 dtype = pn_data_type(self._data)
1477 if dtype == -1:
1478 return None
1479 else:
1480 return dtype
1481
1483 """
1484 Returns a representation of the data encoded in AMQP format.
1485 """
1486 size = 1024
1487 while True:
1488 cd, enc = pn_data_encode(self._data, size)
1489 if cd == PN_OVERFLOW:
1490 size *= 2
1491 elif cd >= 0:
1492 return enc
1493 else:
1494 self._check(cd)
1495
1497 """
1498 Decodes the first value from supplied AMQP data and returns the
1499 number of bytes consumed.
1500
1501 @type encoded: binary
1502 @param encoded: AMQP encoded binary data
1503 """
1504 return self._check(pn_data_decode(self._data, encoded))
1505
1507 """
1508 Puts a list value. Elements may be filled by entering the list
1509 node and putting element values.
1510
1511 >>> data = Data()
1512 >>> data.put_list()
1513 >>> data.enter()
1514 >>> data.put_int(1)
1515 >>> data.put_int(2)
1516 >>> data.put_int(3)
1517 >>> data.exit()
1518 """
1519 self._check(pn_data_put_list(self._data))
1520
1522 """
1523 Puts a map value. Elements may be filled by entering the map node
1524 and putting alternating key value pairs.
1525
1526 >>> data = Data()
1527 >>> data.put_map()
1528 >>> data.enter()
1529 >>> data.put_string("key")
1530 >>> data.put_string("value")
1531 >>> data.exit()
1532 """
1533 self._check(pn_data_put_map(self._data))
1534
1535 - def put_array(self, described, element_type):
1536 """
1537 Puts an array value. Elements may be filled by entering the array
1538 node and putting the element values. The values must all be of the
1539 specified array element type. If an array is described then the
1540 first child value of the array is the descriptor and may be of any
1541 type.
1542
1543 >>> data = Data()
1544 >>>
1545 >>> data.put_array(False, Data.INT)
1546 >>> data.enter()
1547 >>> data.put_int(1)
1548 >>> data.put_int(2)
1549 >>> data.put_int(3)
1550 >>> data.exit()
1551 >>>
1552 >>> data.put_array(True, Data.DOUBLE)
1553 >>> data.enter()
1554 >>> data.put_symbol("array-descriptor")
1555 >>> data.put_double(1.1)
1556 >>> data.put_double(1.2)
1557 >>> data.put_double(1.3)
1558 >>> data.exit()
1559
1560 @type described: bool
1561 @param described: specifies whether the array is described
1562 @type element_type: int
1563 @param element_type: the type of the array elements
1564 """
1565 self._check(pn_data_put_array(self._data, described, element_type))
1566
1568 """
1569 Puts a described value. A described node has two children, the
1570 descriptor and the value. These are specified by entering the node
1571 and putting the desired values.
1572
1573 >>> data = Data()
1574 >>> data.put_described()
1575 >>> data.enter()
1576 >>> data.put_symbol("value-descriptor")
1577 >>> data.put_string("the value")
1578 >>> data.exit()
1579 """
1580 self._check(pn_data_put_described(self._data))
1581
1583 """
1584 Puts a null value.
1585 """
1586 self._check(pn_data_put_null(self._data))
1587
1589 """
1590 Puts a boolean value.
1591
1592 @param b: a boolean value
1593 """
1594 self._check(pn_data_put_bool(self._data, b))
1595
1597 """
1598 Puts an unsigned byte value.
1599
1600 @param ub: an integral value
1601 """
1602 self._check(pn_data_put_ubyte(self._data, ub))
1603
1605 """
1606 Puts a signed byte value.
1607
1608 @param b: an integral value
1609 """
1610 self._check(pn_data_put_byte(self._data, b))
1611
1613 """
1614 Puts an unsigned short value.
1615
1616 @param us: an integral value.
1617 """
1618 self._check(pn_data_put_ushort(self._data, us))
1619
1621 """
1622 Puts a signed short value.
1623
1624 @param s: an integral value
1625 """
1626 self._check(pn_data_put_short(self._data, s))
1627
1629 """
1630 Puts an unsigned int value.
1631
1632 @param ui: an integral value
1633 """
1634 self._check(pn_data_put_uint(self._data, ui))
1635
1637 """
1638 Puts a signed int value.
1639
1640 @param i: an integral value
1641 """
1642 self._check(pn_data_put_int(self._data, i))
1643
1645 """
1646 Puts a char value.
1647
1648 @param c: a single character
1649 """
1650 self._check(pn_data_put_char(self._data, ord(c)))
1651
1653 """
1654 Puts an unsigned long value.
1655
1656 @param ul: an integral value
1657 """
1658 self._check(pn_data_put_ulong(self._data, ul))
1659
1661 """
1662 Puts a signed long value.
1663
1664 @param l: an integral value
1665 """
1666 self._check(pn_data_put_long(self._data, l))
1667
1669 """
1670 Puts a timestamp value.
1671
1672 @param t: an integral value
1673 """
1674 self._check(pn_data_put_timestamp(self._data, t))
1675
1677 """
1678 Puts a float value.
1679
1680 @param f: a floating point value
1681 """
1682 self._check(pn_data_put_float(self._data, f))
1683
1685 """
1686 Puts a double value.
1687
1688 @param d: a floating point value.
1689 """
1690 self._check(pn_data_put_double(self._data, d))
1691
1693 """
1694 Puts a decimal32 value.
1695
1696 @param d: a decimal32 value
1697 """
1698 self._check(pn_data_put_decimal32(self._data, d))
1699
1701 """
1702 Puts a decimal64 value.
1703
1704 @param d: a decimal64 value
1705 """
1706 self._check(pn_data_put_decimal64(self._data, d))
1707
1709 """
1710 Puts a decimal128 value.
1711
1712 @param d: a decimal128 value
1713 """
1714 self._check(pn_data_put_decimal128(self._data, d))
1715
1717 """
1718 Puts a UUID value.
1719
1720 @param u: a uuid value
1721 """
1722 self._check(pn_data_put_uuid(self._data, u.bytes))
1723
1725 """
1726 Puts a binary value.
1727
1728 @type b: binary
1729 @param b: a binary value
1730 """
1731 self._check(pn_data_put_binary(self._data, b))
1732
1734 """
1735 Puts a unicode value.
1736
1737 @type s: unicode
1738 @param s: a unicode value
1739 """
1740 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1741
1743 """
1744 Puts a symbolic value.
1745
1746 @type s: string
1747 @param s: the symbol name
1748 """
1749 self._check(pn_data_put_symbol(self._data, s))
1750
1752 """
1753 If the current node is a list, return the number of elements,
1754 otherwise return zero. List elements can be accessed by entering
1755 the list.
1756
1757 >>> count = data.get_list()
1758 >>> data.enter()
1759 >>> for i in range(count):
1760 ... type = data.next()
1761 ... if type == Data.STRING:
1762 ... print data.get_string()
1763 ... elif type == ...:
1764 ... ...
1765 >>> data.exit()
1766 """
1767 return pn_data_get_list(self._data)
1768
1770 """
1771 If the current node is a map, return the number of child elements,
1772 otherwise return zero. Key value pairs can be accessed by entering
1773 the map.
1774
1775 >>> count = data.get_map()
1776 >>> data.enter()
1777 >>> for i in range(count/2):
1778 ... type = data.next()
1779 ... if type == Data.STRING:
1780 ... print data.get_string()
1781 ... elif type == ...:
1782 ... ...
1783 >>> data.exit()
1784 """
1785 return pn_data_get_map(self._data)
1786
1788 """
1789 If the current node is an array, return a tuple of the element
1790 count, a boolean indicating whether the array is described, and
1791 the type of each element, otherwise return (0, False, None). Array
1792 data can be accessed by entering the array.
1793
1794 >>> # read an array of strings with a symbolic descriptor
1795 >>> count, described, type = data.get_array()
1796 >>> data.enter()
1797 >>> data.next()
1798 >>> print "Descriptor:", data.get_symbol()
1799 >>> for i in range(count):
1800 ... data.next()
1801 ... print "Element:", data.get_string()
1802 >>> data.exit()
1803 """
1804 count = pn_data_get_array(self._data)
1805 described = pn_data_is_array_described(self._data)
1806 type = pn_data_get_array_type(self._data)
1807 if type == -1:
1808 type = None
1809 return count, described, type
1810
1812 """
1813 Checks if the current node is a described value. The descriptor
1814 and value may be accessed by entering the described value.
1815
1816 >>> # read a symbolically described string
1817 >>> assert data.is_described() # will error if the current node is not described
1818 >>> data.enter()
1819 >>> print data.get_symbol()
1820 >>> print data.get_string()
1821 >>> data.exit()
1822 """
1823 return pn_data_is_described(self._data)
1824
1826 """
1827 Checks if the current node is a null.
1828 """
1829 return pn_data_is_null(self._data)
1830
1832 """
1833 If the current node is a boolean, returns its value, returns False
1834 otherwise.
1835 """
1836 return pn_data_get_bool(self._data)
1837
1839 """
1840 If the current node is an unsigned byte, returns its value,
1841 returns 0 otherwise.
1842 """
1843 return pn_data_get_ubyte(self._data)
1844
1846 """
1847 If the current node is a signed byte, returns its value, returns 0
1848 otherwise.
1849 """
1850 return pn_data_get_byte(self._data)
1851
1853 """
1854 If the current node is an unsigned short, returns its value,
1855 returns 0 otherwise.
1856 """
1857 return pn_data_get_ushort(self._data)
1858
1860 """
1861 If the current node is a signed short, returns its value, returns
1862 0 otherwise.
1863 """
1864 return pn_data_get_short(self._data)
1865
1867 """
1868 If the current node is an unsigned int, returns its value, returns
1869 0 otherwise.
1870 """
1871 return pn_data_get_uint(self._data)
1872
1874 """
1875 If the current node is a signed int, returns its value, returns 0
1876 otherwise.
1877 """
1878 return pn_data_get_int(self._data)
1879
1881 """
1882 If the current node is a char, returns its value, returns 0
1883 otherwise.
1884 """
1885 return char(unichr(pn_data_get_char(self._data)))
1886
1888 """
1889 If the current node is an unsigned long, returns its value,
1890 returns 0 otherwise.
1891 """
1892 return ulong(pn_data_get_ulong(self._data))
1893
1895 """
1896 If the current node is an signed long, returns its value, returns
1897 0 otherwise.
1898 """
1899 return pn_data_get_long(self._data)
1900
1902 """
1903 If the current node is a timestamp, returns its value, returns 0
1904 otherwise.
1905 """
1906 return timestamp(pn_data_get_timestamp(self._data))
1907
1909 """
1910 If the current node is a float, returns its value, raises 0
1911 otherwise.
1912 """
1913 return pn_data_get_float(self._data)
1914
1916 """
1917 If the current node is a double, returns its value, returns 0
1918 otherwise.
1919 """
1920 return pn_data_get_double(self._data)
1921
1922
1924 """
1925 If the current node is a decimal32, returns its value, returns 0
1926 otherwise.
1927 """
1928 return pn_data_get_decimal32(self._data)
1929
1930
1932 """
1933 If the current node is a decimal64, returns its value, returns 0
1934 otherwise.
1935 """
1936 return pn_data_get_decimal64(self._data)
1937
1938
1940 """
1941 If the current node is a decimal128, returns its value, returns 0
1942 otherwise.
1943 """
1944 return pn_data_get_decimal128(self._data)
1945
1947 """
1948 If the current node is a UUID, returns its value, returns None
1949 otherwise.
1950 """
1951 if pn_data_type(self._data) == Data.UUID:
1952 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
1953 else:
1954 return None
1955
1957 """
1958 If the current node is binary, returns its value, returns ""
1959 otherwise.
1960 """
1961 return pn_data_get_binary(self._data)
1962
1964 """
1965 If the current node is a string, returns its value, returns ""
1966 otherwise.
1967 """
1968 return pn_data_get_string(self._data).decode("utf8")
1969
1971 """
1972 If the current node is a symbol, returns its value, returns ""
1973 otherwise.
1974 """
1975 return symbol(pn_data_get_symbol(self._data))
1976
1977 - def copy(self, src):
1978 self._check(pn_data_copy(self._data, src._data))
1979
1990
1992 pn_data_dump(self._data)
1993
2003
2005 if self.enter():
2006 try:
2007 result = {}
2008 while self.next():
2009 k = self.get_object()
2010 if self.next():
2011 v = self.get_object()
2012 else:
2013 v = None
2014 result[k] = v
2015 finally:
2016 self.exit()
2017 return result
2018
2027
2029 if self.enter():
2030 try:
2031 result = []
2032 while self.next():
2033 result.append(self.get_object())
2034 finally:
2035 self.exit()
2036 return result
2037
2048
2057
2059 """
2060 If the current node is an array, return an Array object
2061 representing the array and its contents. Otherwise return None.
2062 This is a convenience wrapper around get_array, enter, etc.
2063 """
2064
2065 count, described, type = self.get_array()
2066 if type is None: return None
2067 if self.enter():
2068 try:
2069 if described:
2070 self.next()
2071 descriptor = self.get_object()
2072 else:
2073 descriptor = UNDESCRIBED
2074 elements = []
2075 while self.next():
2076 elements.append(self.get_object())
2077 finally:
2078 self.exit()
2079 return Array(descriptor, type, *elements)
2080
2092
2093 put_mappings = {
2094 None.__class__: lambda s, _: s.put_null(),
2095 bool: put_bool,
2096 dict: put_dict,
2097 list: put_sequence,
2098 tuple: put_sequence,
2099 unicode: put_string,
2100 bytes: put_binary,
2101 symbol: put_symbol,
2102 int: put_long,
2103 char: put_char,
2104 long: put_long,
2105 ulong: put_ulong,
2106 timestamp: put_timestamp,
2107 float: put_double,
2108 uuid.UUID: put_uuid,
2109 Described: put_py_described,
2110 Array: put_py_array
2111 }
2112 get_mappings = {
2113 NULL: lambda s: None,
2114 BOOL: get_bool,
2115 BYTE: get_byte,
2116 UBYTE: get_ubyte,
2117 SHORT: get_short,
2118 USHORT: get_ushort,
2119 INT: get_int,
2120 UINT: get_uint,
2121 CHAR: get_char,
2122 LONG: get_long,
2123 ULONG: get_ulong,
2124 TIMESTAMP: get_timestamp,
2125 FLOAT: get_float,
2126 DOUBLE: get_double,
2127 DECIMAL32: get_decimal32,
2128 DECIMAL64: get_decimal64,
2129 DECIMAL128: get_decimal128,
2130 UUID: get_uuid,
2131 BINARY: get_binary,
2132 STRING: get_string,
2133 SYMBOL: get_symbol,
2134 DESCRIBED: get_py_described,
2135 ARRAY: get_py_array,
2136 LIST: get_sequence,
2137 MAP: get_dict
2138 }
2139
2140
2142 putter = self.put_mappings[obj.__class__]
2143 putter(self, obj)
2144
2146 type = self.type()
2147 if type is None: return None
2148 getter = self.get_mappings.get(type)
2149 if getter:
2150 return getter(self)
2151 else:
2152 return UnmappedType(str(type))
2153
2156
2158
2159 LOCAL_UNINIT = PN_LOCAL_UNINIT
2160 REMOTE_UNINIT = PN_REMOTE_UNINIT
2161 LOCAL_ACTIVE = PN_LOCAL_ACTIVE
2162 REMOTE_ACTIVE = PN_REMOTE_ACTIVE
2163 LOCAL_CLOSED = PN_LOCAL_CLOSED
2164 REMOTE_CLOSED = PN_REMOTE_CLOSED
2165
2168
2170 obj2cond(self.condition, self._get_cond_impl())
2171
2172 @property
2174 return cond2obj(self._get_remote_cond_impl())
2175
2176
2178 assert False, "Subclass must override this!"
2179
2181 assert False, "Subclass must override this!"
2182
2184
2185 - def __init__(self, name, description=None, info=None):
2186 self.name = name
2187 self.description = description
2188 self.info = info
2189
2191 return "Condition(%s)" % ", ".join([repr(x) for x in
2192 (self.name, self.description, self.info)
2193 if x])
2194
2196 if not isinstance(o, Condition): return False
2197 return self.name == o.name and \
2198 self.description == o.description and \
2199 self.info == o.info
2200
2202 pn_condition_clear(cond)
2203 if obj:
2204 pn_condition_set_name(cond, str(obj.name))
2205 pn_condition_set_description(cond, obj.description)
2206 info = Data(pn_condition_info(cond))
2207 if obj.info:
2208 info.put_object(obj.info)
2209
2211 if pn_condition_is_set(cond):
2212 return Condition(pn_condition_get_name(cond),
2213 pn_condition_get_description(cond),
2214 dat2obj(pn_condition_info(cond)))
2215 else:
2216 return None
2217
2225
2230
2232
2233 @staticmethod
2235 """Maintain only a single instance of this class for each Connection
2236 object that exists in the the C Engine. This is done by storing a (weak)
2237 reference to the python instance in the context field of the C object.
2238 """
2239 if not c_conn: return None
2240 py_conn = pn_connection_get_context(c_conn)
2241 if py_conn: return py_conn
2242 wrapper = Connection(_conn=c_conn)
2243 return wrapper
2244
2246 Endpoint.__init__(self)
2247 if _conn:
2248 self._conn = _conn
2249 else:
2250 self._conn = pn_connection()
2251 pn_connection_set_context(self._conn, self)
2252 self.offered_capabilities = None
2253 self.desired_capabilities = None
2254 self.properties = None
2255 self._sessions = set()
2256
2258 if hasattr(self, "_conn") and self._conn:
2259
2260
2261 if hasattr(self, "_sessions") and self._sessions:
2262 for s in self._sessions:
2263 s._release()
2264 pn_connection_set_context(self._conn, None)
2265 pn_connection_free(self._conn)
2266
2268 if err < 0:
2269 exc = EXCEPTIONS.get(err, ConnectionException)
2270 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
2271 else:
2272 return err
2273
2275 return pn_connection_condition(self._conn)
2276
2278 return pn_connection_remote_condition(self._conn)
2279
2281 if collector is None:
2282 pn_connection_collect(self._conn, None)
2283 else:
2284 pn_connection_collect(self._conn, collector._impl)
2285
2286
2287 self._collector = collector
2288
2290 return pn_connection_get_container(self._conn)
2292 return pn_connection_set_container(self._conn, name)
2293
2294 container = property(_get_container, _set_container)
2295
2297 return pn_connection_get_hostname(self._conn)
2299 return pn_connection_set_hostname(self._conn, name)
2300
2301 hostname = property(_get_hostname, _set_hostname)
2302
2303 @property
2305 return pn_connection_remote_container(self._conn)
2306
2307 @property
2309 return pn_connection_remote_hostname(self._conn)
2310
2311 @property
2313 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
2314
2315 @property
2317 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
2318
2319 @property
2321 return dat2obj(pn_connection_remote_properties(self._conn))
2322
2324 obj2dat(self.offered_capabilities,
2325 pn_connection_offered_capabilities(self._conn))
2326 obj2dat(self.desired_capabilities,
2327 pn_connection_desired_capabilities(self._conn))
2328 obj2dat(self.properties, pn_connection_properties(self._conn))
2329 pn_connection_open(self._conn)
2330
2332 self._update_cond()
2333 pn_connection_close(self._conn)
2334
2335 @property
2337 return pn_connection_state(self._conn)
2338
2340 return Session._wrap_session(pn_session(self._conn))
2341
2343 return Session._wrap_session(pn_session_head(self._conn, mask))
2344
2346 return Link._wrap_link(pn_link_head(self._conn, mask))
2347
2348 @property
2350 return Delivery._wrap_delivery(pn_work_head(self._conn))
2351
2352 @property
2354 return pn_error_code(pn_connection_error(self._conn))
2355
2358
2360
2361 @staticmethod
2363 """Maintain only a single instance of this class for each Session object that
2364 exists in the C Engine.
2365 """
2366 if c_ssn is None: return None
2367 py_ssn = pn_session_get_context(c_ssn)
2368 if py_ssn: return py_ssn
2369 wrapper = Session(c_ssn)
2370 return wrapper
2371
2373 Endpoint.__init__(self)
2374 self._ssn = ssn
2375 pn_session_set_context(self._ssn, self)
2376 self._links = set()
2377 self.connection._sessions.add(self)
2378
2380 """Release the underlying C Engine resource."""
2381 if self._ssn:
2382
2383
2384 for l in self._links:
2385 l._release()
2386 pn_session_set_context(self._ssn, None)
2387 pn_session_free(self._ssn)
2388 self._ssn = None
2389
2391 """Release the Session, freeing its resources.
2392
2393 Call this when you no longer need the session. This will allow the
2394 session's resources to be reclaimed. Once called, you should no longer
2395 reference the session.
2396
2397 """
2398 self.connection._sessions.remove(self)
2399 self._release()
2400
2402 return pn_session_condition(self._ssn)
2403
2405 return pn_session_remote_condition(self._ssn)
2406
2408 return pn_session_get_incoming_capacity(self._ssn)
2409
2411 pn_session_set_incoming_capacity(self._ssn, capacity)
2412
2413 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2414
2415 @property
2417 return pn_session_outgoing_bytes(self._ssn)
2418
2419 @property
2421 return pn_session_incoming_bytes(self._ssn)
2422
2424 pn_session_open(self._ssn)
2425
2427 self._update_cond()
2428 pn_session_close(self._ssn)
2429
2430 - def next(self, mask):
2431 return Session._wrap_session(pn_session_next(self._ssn, mask))
2432
2433 @property
2435 return pn_session_state(self._ssn)
2436
2437 @property
2439 return Connection._wrap_connection(pn_session_connection(self._ssn))
2440
2442 return Link._wrap_link(pn_sender(self._ssn, name))
2443
2445 return Link._wrap_link(pn_receiver(self._ssn, name))
2446
2449
2450 -class Link(Endpoint):
2451
2452 SND_UNSETTLED = PN_SND_UNSETTLED
2453 SND_SETTLED = PN_SND_SETTLED
2454 SND_MIXED = PN_SND_MIXED
2455
2456 RCV_FIRST = PN_RCV_FIRST
2457 RCV_SECOND = PN_RCV_SECOND
2458
2459 @staticmethod
2461 """Maintain only a single instance of this class for each Session object that
2462 exists in the C Engine.
2463 """
2464 if c_link is None: return None
2465 py_link = pn_link_get_context(c_link)
2466 if py_link: return py_link
2467 if pn_link_is_sender(c_link):
2468 wrapper = Sender(c_link)
2469 else:
2470 wrapper = Receiver(c_link)
2471 return wrapper
2472
2474 Endpoint.__init__(self)
2475 self._link = c_link
2476 pn_link_set_context(self._link, self)
2477 self._deliveries = set()
2478 self.session._links.add(self)
2479
2481 """Release the underlying C Engine resource."""
2482 if self._link:
2483
2484
2485 for d in self._deliveries:
2486 d._release()
2487 pn_link_set_context(self._link, None)
2488 pn_link_free(self._link)
2489 self._link = None
2490
2492 """Release the Link, freeing its resources"""
2493 self.session._links.remove(self)
2494 self._release()
2495
2497 if err < 0:
2498 exc = EXCEPTIONS.get(err, LinkException)
2499 raise exc("[%s]: %s" % (err, pn_link_error(self._link)))
2500 else:
2501 return err
2502
2504 return pn_link_condition(self._link)
2505
2507 return pn_link_remote_condition(self._link)
2508
2510 pn_link_open(self._link)
2511
2513 self._update_cond()
2514 pn_link_close(self._link)
2515
2516 @property
2518 return pn_link_state(self._link)
2519
2520 @property
2522 return Terminus(pn_link_source(self._link))
2523
2524 @property
2526 return Terminus(pn_link_target(self._link))
2527
2528 @property
2530 return Terminus(pn_link_remote_source(self._link))
2531 @property
2533 return Terminus(pn_link_remote_target(self._link))
2534
2535 @property
2537 return Session._wrap_session(pn_link_session(self._link))
2538
2540 return Delivery._wrap_delivery(pn_delivery(self._link, tag))
2541
2542 @property
2544 return Delivery._wrap_delivery(pn_link_current(self._link))
2545
2547 return pn_link_advance(self._link)
2548
2549 @property
2551 return pn_link_unsettled(self._link)
2552
2553 @property
2555 return pn_link_credit(self._link)
2556
2557 @property
2559 return pn_link_available(self._link)
2560
2561 @property
2563 return pn_link_queued(self._link)
2564
2565 - def next(self, mask):
2566 return Link._wrap_link(pn_link_next(self._link, mask))
2567
2568 @property
2570 return pn_link_name(self._link)
2571
2572 @property
2574 return pn_link_is_sender(self._link)
2575
2576 @property
2578 return pn_link_is_receiver(self._link)
2579
2580 @property
2582 return pn_link_remote_snd_settle_mode(self._link)
2583
2584 @property
2586 return pn_link_remote_rcv_settle_mode(self._link)
2587
2589 return pn_link_snd_settle_mode(self._link)
2591 pn_link_set_snd_settle_mode(self._link, mode)
2592 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2593
2595 return pn_link_rcv_settle_mode(self._link)
2597 pn_link_set_rcv_settle_mode(self._link, mode)
2598 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2599
2601 return pn_link_drained(self._link)
2602
2604
2605 UNSPECIFIED = PN_UNSPECIFIED
2606 SOURCE = PN_SOURCE
2607 TARGET = PN_TARGET
2608 COORDINATOR = PN_COORDINATOR
2609
2610 NONDURABLE = PN_NONDURABLE
2611 CONFIGURATION = PN_CONFIGURATION
2612 DELIVERIES = PN_DELIVERIES
2613
2614 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2615 DIST_MODE_COPY = PN_DIST_MODE_COPY
2616 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2617
2620
2622 if err < 0:
2623 exc = EXCEPTIONS.get(err, LinkException)
2624 raise exc("[%s]" % err)
2625 else:
2626 return err
2627
2629 return pn_terminus_get_type(self._impl)
2631 self._check(pn_terminus_set_type(self._impl, type))
2632 type = property(_get_type, _set_type)
2633
2635 return pn_terminus_get_address(self._impl)
2637 self._check(pn_terminus_set_address(self._impl, address))
2638 address = property(_get_address, _set_address)
2639
2641 return pn_terminus_get_durability(self._impl)
2643 self._check(pn_terminus_set_durability(self._impl, seconds))
2644 durability = property(_get_durability, _set_durability)
2645
2647 return pn_terminus_get_expiry_policy(self._impl)
2649 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2650 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2651
2653 return pn_terminus_get_timeout(self._impl)
2655 self._check(pn_terminus_set_timeout(self._impl, seconds))
2656 timeout = property(_get_timeout, _set_timeout)
2657
2659 return pn_terminus_is_dynamic(self._impl)
2661 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2662 dynamic = property(_is_dynamic, _set_dynamic)
2663
2665 return pn_terminus_get_distribution_mode(self._impl)
2667 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2668 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2669
2670 @property
2672 return Data(pn_terminus_properties(self._impl))
2673
2674 @property
2676 return Data(pn_terminus_capabilities(self._impl))
2677
2678 @property
2680 return Data(pn_terminus_outcomes(self._impl))
2681
2682 @property
2684 return Data(pn_terminus_filter(self._impl))
2685
2686 - def copy(self, src):
2687 self._check(pn_terminus_copy(self._impl, src._impl))
2688
2690
2693
2695 pn_link_offered(self._link, n)
2696
2697 - def send(self, bytes):
2698 return self._check(pn_link_send(self._link, bytes))
2699
2701
2704
2705 - def flow(self, n):
2706 pn_link_flow(self._link, n)
2707
2708 - def recv(self, limit):
2709 n, bytes = pn_link_recv(self._link, limit)
2710 if n == PN_EOS:
2711 return None
2712 else:
2713 self._check(n)
2714 return bytes
2715
2717 pn_link_drain(self._link, n)
2718
2720 return pn_link_draining(self._link)
2721
2723
2724 RECEIVED = PN_RECEIVED
2725 ACCEPTED = PN_ACCEPTED
2726 REJECTED = PN_REJECTED
2727 RELEASED = PN_RELEASED
2728 MODIFIED = PN_MODIFIED
2729
2731 self._impl = impl
2732 self.local = local
2733 self._data = None
2734 self._condition = None
2735 self._annotations = None
2736
2737 @property
2739 return pn_disposition_type(self._impl)
2740
2742 return pn_disposition_get_section_number(self._impl)
2744 pn_disposition_set_section_number(self._impl, n)
2745 section_number = property(_get_section_number, _set_section_number)
2746
2748 return pn_disposition_get_section_offset(self._impl)
2750 pn_disposition_set_section_offset(self._impl, n)
2751 section_offset = property(_get_section_offset, _set_section_offset)
2752
2754 return pn_disposition_is_failed(self._impl)
2756 pn_disposition_set_failed(self._impl, b)
2757 failed = property(_get_failed, _set_failed)
2758
2760 return pn_disposition_is_undeliverable(self._impl)
2762 pn_disposition_set_undeliverable(self._impl, b)
2763 undeliverable = property(_get_undeliverable, _set_undeliverable)
2764
2766 if self.local:
2767 return self._data
2768 else:
2769 return dat2obj(pn_disposition_data(self._impl))
2771 if self.local:
2772 self._data = obj
2773 else:
2774 raise AttributeError("data attribute is read-only")
2775 data = property(_get_data, _set_data)
2776
2778 if self.local:
2779 return self._annotations
2780 else:
2781 return dat2obj(pn_disposition_annotations(self._impl))
2783 if self.local:
2784 self._annotations = obj
2785 else:
2786 raise AttributeError("annotations attribute is read-only")
2787 annotations = property(_get_annotations, _set_annotations)
2788
2790 if self.local:
2791 return self._condition
2792 else:
2793 return cond2obj(pn_disposition_condition(self._impl))
2795 if self.local:
2796 self._condition = obj
2797 else:
2798 raise AttributeError("condition attribute is read-only")
2799 condition = property(_get_condition, _set_condition)
2800
2802
2803 RECEIVED = Disposition.RECEIVED
2804 ACCEPTED = Disposition.ACCEPTED
2805 REJECTED = Disposition.REJECTED
2806 RELEASED = Disposition.RELEASED
2807 MODIFIED = Disposition.MODIFIED
2808
2809 @staticmethod
2811 """Maintain only a single instance of this class for each Delivery object that
2812 exists in the C Engine.
2813 """
2814 if not c_dlv: return None
2815 py_dlv = pn_delivery_get_context(c_dlv)
2816 if py_dlv: return py_dlv
2817 wrapper = Delivery(c_dlv)
2818 return wrapper
2819
2821 self._dlv = dlv
2822 pn_delivery_set_context(self._dlv, self)
2823 self.local = Disposition(pn_delivery_local(self._dlv), True)
2824 self.remote = Disposition(pn_delivery_remote(self._dlv), False)
2825 self.link._deliveries.add(self)
2826
2828 """Release the underlying C Engine resource."""
2829 if self._dlv:
2830 pn_delivery_set_context(self._dlv, None)
2831 pn_delivery_settle(self._dlv)
2832 self._dlv = None
2833
2834 @property
2836 return pn_delivery_tag(self._dlv)
2837
2838 @property
2840 return pn_delivery_writable(self._dlv)
2841
2842 @property
2844 return pn_delivery_readable(self._dlv)
2845
2846 @property
2848 return pn_delivery_updated(self._dlv)
2849
2851 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
2852 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
2853 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
2854 pn_delivery_update(self._dlv, state)
2855
2856 @property
2858 return pn_delivery_pending(self._dlv)
2859
2860 @property
2862 return pn_delivery_partial(self._dlv)
2863
2864 @property
2866 return pn_delivery_local_state(self._dlv)
2867
2868 @property
2870 return pn_delivery_remote_state(self._dlv)
2871
2872 @property
2874 return pn_delivery_settled(self._dlv)
2875
2877 """Release the delivery"""
2878 self.link._deliveries.remove(self)
2879 self._release()
2880
2881 @property
2883 return Delivery._wrap_delivery(pn_work_next(self._dlv))
2884
2885 @property
2887 return Link._wrap_link(pn_delivery_link(self._dlv))
2888
2891
2893
2894 TRACE_DRV = PN_TRACE_DRV
2895 TRACE_FRM = PN_TRACE_FRM
2896 TRACE_RAW = PN_TRACE_RAW
2897
2899 if not _trans:
2900 self._trans = pn_transport()
2901 else:
2902 self._shared_trans = True
2903 self._trans = _trans
2904 self._sasl = None
2905 self._ssl = None
2906
2908 if hasattr(self, "_trans"):
2909 if not hasattr(self, "_shared_trans"):
2910 pn_transport_free(self._trans)
2911 if hasattr(self, "_sasl") and self._sasl:
2912
2913
2914 self._sasl._sasl = None
2915 self._sasl = None
2916 if hasattr(self, "_ssl") and self._ssl:
2917
2918 self._ssl._ssl = None
2919 self._ssl = None
2920 del self._trans
2921
2923 if err < 0:
2924 exc = EXCEPTIONS.get(err, TransportException)
2925 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
2926 else:
2927 return err
2928
2929 - def bind(self, connection):
2930 """Assign a connection to the transport"""
2931 self._check(pn_transport_bind(self._trans, connection._conn))
2932
2933 self._connection = connection
2934
2936 """Release the connection"""
2937 self._check(pn_transport_unbind(self._trans))
2938 self._connection = None
2939
2941 pn_transport_trace(self._trans, n)
2942
2943 - def tick(self, now):
2944 """Process any timed events (like heartbeat generation).
2945 now = seconds since epoch (float).
2946 """
2947 next = pn_transport_tick(self._trans, long(now * 1000))
2948 return float(next) / 1000.0
2949
2951 c = pn_transport_capacity(self._trans)
2952 if c >= PN_EOS:
2953 return c
2954 else:
2955 return self._check(c)
2956
2957 - def push(self, bytes):
2958 self._check(pn_transport_push(self._trans, bytes))
2959
2961 self._check(pn_transport_close_tail(self._trans))
2962
2964 p = pn_transport_pending(self._trans)
2965 if p >= PN_EOS:
2966 return p
2967 else:
2968 return self._check(p)
2969
2970 - def peek(self, size):
2971 cd, out = pn_transport_peek(self._trans, size)
2972 if cd == PN_EOS:
2973 return None
2974 else:
2975 self._check(cd)
2976 return out
2977
2978 - def pop(self, size):
2979 pn_transport_pop(self._trans, size)
2980
2982 self._check(pn_transport_close_head(self._trans))
2983
2985 p = self.pending()
2986 if p < 0:
2987 return None
2988 else:
2989 out = self.peek(min(size, p))
2990 self.pop(len(out))
2991 return out
2992
3004
3005
3007 return pn_transport_get_max_frame(self._trans)
3008
3010 pn_transport_set_max_frame(self._trans, value)
3011
3012 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3013 doc="""
3014 Sets the maximum size for received frames (in bytes).
3015 """)
3016
3017 @property
3019 return pn_transport_get_remote_max_frame(self._trans)
3020
3022 return pn_transport_get_channel_max(self._trans)
3023
3025 pn_transport_set_channel_max(self._trans, value)
3026
3027 channel_max = property(_get_channel_max, _set_channel_max,
3028 doc="""
3029 Sets the maximum channel that may be used on the transport.
3030 """)
3031
3032 @property
3034 return pn_transport_remote_channel_max(self._trans)
3035
3036
3038 msec = pn_transport_get_idle_timeout(self._trans)
3039 return float(msec)/1000.0
3040
3042 pn_transport_set_idle_timeout(self._trans, long(sec * 1000))
3043
3044 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3045 doc="""
3046 The idle timeout of the connection (float, in seconds).
3047 """)
3048
3049 @property
3051 msec = pn_transport_get_remote_idle_timeout(self._trans)
3052 return float(msec)/1000.0
3053
3054 @property
3056 return pn_transport_get_frames_output(self._trans)
3057
3058 @property
3061
3063
3064 if not self._sasl:
3065 self._sasl = SASL(self)
3066 return self._sasl
3067
3068 - def ssl(self, domain=None, session_details=None):
3069
3070 if not self._ssl:
3071 self._ssl = SSL(self, domain, session_details)
3072 return self._ssl
3073
3076
3077 -class SASL(object):
3078
3079 OK = PN_SASL_OK
3080 AUTH = PN_SASL_AUTH
3081
3089
3091 if err < 0:
3092 exc = EXCEPTIONS.get(err, SASLException)
3093 raise exc("[%s]" % (err))
3094 else:
3095 return err
3096
3098 pn_sasl_mechanisms(self._sasl, mechs)
3099
3101 pn_sasl_client(self._sasl)
3102
3104 pn_sasl_server(self._sasl)
3105
3106 - def plain(self, user, password):
3107 pn_sasl_plain(self._sasl, user, password)
3108
3109 - def send(self, data):
3110 self._check(pn_sasl_send(self._sasl, data, len(data)))
3111
3113 sz = 16
3114 while True:
3115 n, data = pn_sasl_recv(self._sasl, sz)
3116 if n == PN_OVERFLOW:
3117 sz *= 2
3118 continue
3119 elif n == PN_EOS:
3120 return None
3121 else:
3122 self._check(n)
3123 return data
3124
3125 @property
3127 outcome = pn_sasl_outcome(self._sasl)
3128 if outcome == PN_SASL_NONE:
3129 return None
3130 else:
3131 return outcome
3132
3133 - def done(self, outcome):
3134 pn_sasl_done(self._sasl, outcome)
3135
3136 STATE_CONF = PN_SASL_CONF
3137 STATE_IDLE = PN_SASL_IDLE
3138 STATE_STEP = PN_SASL_STEP
3139 STATE_PASS = PN_SASL_PASS
3140 STATE_FAIL = PN_SASL_FAIL
3141
3142 @property
3144 return pn_sasl_state(self._sasl)
3145
3149
3152
3153 -class SSLDomain(object):
3154
3155 MODE_CLIENT = PN_SSL_MODE_CLIENT
3156 MODE_SERVER = PN_SSL_MODE_SERVER
3157 VERIFY_PEER = PN_SSL_VERIFY_PEER
3158 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3159 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3160
3161 - def __init__(self, mode):
3162 self._domain = pn_ssl_domain(mode)
3163 if self._domain is None:
3164 raise SSLUnavailable()
3165
3166 - def _check(self, err):
3167 if err < 0:
3168 exc = EXCEPTIONS.get(err, SSLException)
3169 raise exc("SSL failure.")
3170 else:
3171 return err
3172
3173 - def set_credentials(self, cert_file, key_file, password):
3174 return self._check( pn_ssl_domain_set_credentials(self._domain,
3175 cert_file, key_file,
3176 password) )
3177 - def set_trusted_ca_db(self, certificate_db):
3178 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3179 certificate_db) )
3180 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3181 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3182 verify_mode,
3183 trusted_CAs) )
3184
3186 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3187
3189
3196
3197 - def __new__(cls, transport, domain, session_details=None):
3198 """Enforce a singleton SSL object per Transport"""
3199 if transport._ssl:
3200
3201
3202
3203 ssl = transport._ssl
3204 if (domain and (ssl._domain is not domain) or
3205 session_details and (ssl._session_details is not session_details)):
3206 raise SSLException("Cannot re-configure existing SSL object!")
3207 else:
3208 obj = super(SSL, cls).__new__(cls)
3209 obj._domain = domain
3210 obj._session_details = session_details
3211 session_id = None
3212 if session_details:
3213 session_id = session_details.get_session_id()
3214 obj._ssl = pn_ssl( transport._trans )
3215 if obj._ssl is None:
3216 raise SSLUnavailable()
3217 pn_ssl_init( obj._ssl, domain._domain, session_id )
3218 transport._ssl = obj
3219 return transport._ssl
3220
3222 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3223 if rc:
3224 return name
3225 return None
3226
3228 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3229 if rc:
3230 return name
3231 return None
3232
3233 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3234 RESUME_NEW = PN_SSL_RESUME_NEW
3235 RESUME_REUSED = PN_SSL_RESUME_REUSED
3236
3238 return pn_ssl_resume_status( self._ssl )
3239
3241 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
3243 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3244 self._check(err)
3245 return name
3246 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3247 doc="""
3248 Manage the expected name of the remote peer. Used to authenticate the remote.
3249 """)
3250
3253 """ Unique identifier for the SSL session. Used to resume previous session on a new
3254 SSL connection.
3255 """
3256
3258 self._session_id = session_id
3259
3261 return self._session_id
3262
3265
3267 self._impl = pn_collector()
3268
3270 event = pn_collector_peek(self._impl)
3271 if event is None:
3272 return None
3273
3274 tpi = pn_event_transport(event)
3275 if tpi:
3276 tp = Transport(tpi)
3277 else:
3278 tp = None
3279 return Event(type=pn_event_type(event),
3280 category=pn_event_category(event),
3281 connection=Connection._wrap_connection(pn_event_connection(event)),
3282 session=Session._wrap_session(pn_event_session(event)),
3283 link=Link._wrap_link(pn_event_link(event)),
3284 delivery=Delivery._wrap_delivery(pn_event_delivery(event)),
3285 transport=tp)
3286
3288 pn_collector_pop(self._impl)
3289
3291 pn_collector_free(self._impl)
3292
3322
3328 """
3329 The DriverException class is the root of the driver exception hierarchy.
3330 """
3331 pass
3332
3334
3335 @staticmethod
3337 """Maintain only a single instance of this class for each Connector object that
3338 exists in the C Driver.
3339 """
3340 if not c_cxtr: return None
3341 py_cxtr = pn_connector_context(c_cxtr)
3342 if py_cxtr: return py_cxtr
3343 wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver)
3344 return wrapper
3345
3346 - def __init__(self, _cxtr, _py_driver):
3347 self._cxtr = _cxtr
3348 assert(_py_driver)
3349 self._driver = weakref.ref(_py_driver)
3350 pn_connector_set_context(self._cxtr, self)
3351 self._connection = None
3352 self._driver()._connectors.add(self)
3353
3355 """Release the underlying C Engine resource."""
3356 if self._cxtr:
3357 pn_connector_set_context(self._cxtr, None)
3358 pn_connector_free(self._cxtr)
3359 self._cxtr = None
3360
3362 """Release the Connector, freeing its resources.
3363
3364 Call this when you no longer need the Connector. This will allow the
3365 connector's resources to be reclaimed. Once called, you should no longer
3366 reference this connector.
3367
3368 """
3369 self.connection = None
3370 d = self._driver()
3371 if d: d._connectors.remove(self)
3372 self._release()
3373
3375 return Connector._wrap_connector(pn_connector_next(self._cxtr))
3376
3378 pn_connector_process(self._cxtr)
3379
3381 return Listener._wrap_listener(pn_connector_listener(self._cxtr))
3382
3389
3390 @property
3392 trans = pn_connector_transport(self._cxtr)
3393 if trans:
3394 return Transport(trans)
3395 return None
3396
3398 return pn_connector_close(self._cxtr)
3399
3400 @property
3402 return pn_connector_closed(self._cxtr)
3403
3405 return self._connection
3406
3408 if conn:
3409 pn_connector_set_connection(self._cxtr, conn._conn)
3410 else:
3411 pn_connector_set_connection(self._cxtr, None)
3412 self._connection = conn
3413
3414
3415 connection = property(_get_connection, _set_connection,
3416 doc="""
3417 Associate a Connection with this Connector.
3418 """)
3419
3421
3422 @staticmethod
3424 """Maintain only a single instance of this class for each Listener object that
3425 exists in the C Driver.
3426 """
3427 if not c_lsnr: return None
3428 py_lsnr = pn_listener_context(c_lsnr)
3429 if py_lsnr: return py_lsnr
3430 wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver)
3431 return wrapper
3432
3433 - def __init__(self, _lsnr, _py_driver):
3434 self._lsnr = _lsnr
3435 assert(_py_driver)
3436 self._driver = weakref.ref(_py_driver)
3437 pn_listener_set_context(self._lsnr, self)
3438 self._driver()._listeners.add(self)
3439
3441 """Release the underlying C Engine resource."""
3442 if self._lsnr:
3443 pn_listener_set_context(self._lsnr, None);
3444 pn_listener_free(self._lsnr)
3445 self._lsnr = None
3446
3448 """Release the Listener, freeing its resources"""
3449 d = self._driver()
3450 if d: d._listeners.remove(self)
3451 self._release()
3452
3454 return Listener._wrap_listener(pn_listener_next(self._lsnr))
3455
3457 d = self._driver()
3458 if d:
3459 cxtr = pn_listener_accept(self._lsnr)
3460 c = Connector._wrap_connector(cxtr, d)
3461 return c
3462 return None
3463
3465 pn_listener_close(self._lsnr)
3466
3469 self._driver = pn_driver()
3470 self._listeners = set()
3471 self._connectors = set()
3472
3474
3475
3476 for c in self._connectors:
3477 c._release()
3478 for l in self._listeners:
3479 l._release()
3480 if hasattr(self, "_driver") and self._driver:
3481 pn_driver_free(self._driver)
3482 del self._driver
3483
3484 - def wait(self, timeout_sec):
3485 if timeout_sec is None or timeout_sec < 0.0:
3486 t = -1
3487 else:
3488 t = long(1000*timeout_sec)
3489 return pn_driver_wait(self._driver, t)
3490
3492 return pn_driver_wakeup(self._driver)
3493
3495 """Construct a listener"""
3496 return Listener._wrap_listener(pn_listener(self._driver, host, port, None),
3497 self)
3498
3500 return Listener._wrap_listener(pn_driver_listener(self._driver))
3501
3503 return Listener._wrap_listener(pn_listener_head(self._driver))
3504
3506 return Connector._wrap_connector(pn_connector(self._driver, host, port, None),
3507 self)
3508
3510 return Connector._wrap_connector(pn_connector_head(self._driver))
3511
3513 return Connector._wrap_connector(pn_driver_connector(self._driver))
3514
3515 __all__ = [
3516 "API_LANGUAGE",
3517 "IMPLEMENTATION_LANGUAGE",
3518 "ABORTED",
3519 "ACCEPTED",
3520 "AUTOMATIC",
3521 "PENDING",
3522 "MANUAL",
3523 "REJECTED",
3524 "RELEASED",
3525 "SETTLED",
3526 "UNDESCRIBED",
3527 "Array",
3528 "Collector",
3529 "Condition",
3530 "Connection",
3531 "Connector",
3532 "Data",
3533 "Delivery",
3534 "Disposition",
3535 "Described",
3536 "Driver",
3537 "DriverException",
3538 "Endpoint",
3539 "Event",
3540 "Link",
3541 "Listener",
3542 "Message",
3543 "MessageException",
3544 "Messenger",
3545 "MessengerException",
3546 "ProtonException",
3547 "Receiver",
3548 "SASL",
3549 "Sender",
3550 "Session",
3551 "SSL",
3552 "SSLDomain",
3553 "SSLSessionDetails",
3554 "SSLUnavailable",
3555 "SSLException",
3556 "Terminus",
3557 "Timeout",
3558 "Interrupt",
3559 "Transport",
3560 "TransportException",
3561 "char",
3562 "symbol",
3563 "timestamp",
3564 "ulong"
3565 ]
3566