Package nbxmpp :: Module client_nb
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.client_nb

  1  ##   client_nb.py 
  2  ##         based on client.py, changes backported up to revision 1.60 
  3  ## 
  4  ##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov 
  5  ##         modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ## 
  7  ##   This program is free software; you can redistribute it and/or modify 
  8  ##   it under the terms of the GNU General Public License as published by 
  9  ##   the Free Software Foundation; either version 2, or (at your option) 
 10  ##   any later version. 
 11  ## 
 12  ##   This program is distributed in the hope that it will be useful, 
 13  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ##   GNU General Public License for more details. 
 16   
 17  # $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $ 
 18   
 19  """ 
 20  Client class establishs connection to XMPP Server and handles authentication 
 21  """ 
 22   
 23  import socket 
 24  import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh 
 25  from protocol import NS_TLS 
 26   
 27  import logging 
 28  log = logging.getLogger('nbxmpp.client_nb') 
 29   
 30   
31 -class NonBlockingClient:
32 """ 33 Client class is XMPP connection mountpoint. Objects for authentication, 34 network communication, roster, xml parsing ... are plugged to client object. 35 Client implements the abstract behavior - mostly negotioation and callbacks 36 handling, whereas underlying modules take care of feature-specific logic 37 """ 38
39 - def __init__(self, domain, idlequeue, caller=None):
40 """ 41 Caches connection data 42 43 :param domain: domain - for to: attribute (from account info) 44 :param idlequeue: processing idlequeue 45 :param caller: calling object - it has to implement methods 46 _event_dispatcher which is called from dispatcher instance 47 """ 48 self.Namespace = protocol.NS_CLIENT 49 self.defaultNamespace = self.Namespace 50 51 self.idlequeue = idlequeue 52 self.disconnect_handlers = [] 53 54 self.Server = domain 55 self.xmpp_hostname = None # FQDN hostname to connect to 56 57 # caller is who initiated this client, it is in needed to register 58 # the EventDispatcher 59 self._caller = caller 60 self._owner = self 61 self._registered_name = None # our full jid, set after successful auth 62 self.connected = '' 63 self.ip_addresses = [] 64 self.socket = None 65 self.on_connect = None 66 self.on_proxy_failure = None 67 self.on_connect_failure = None 68 self.proxy = None 69 self.got_features = False 70 self.got_see_other_host = None 71 self.stream_started = False 72 self.disconnecting = False 73 self.protocol_type = 'XMPP'
74
75 - def disconnect(self, message=''):
76 """ 77 Called on disconnection - disconnect callback is picked based on state of 78 the client. 79 """ 80 # to avoid recursive calls 81 if self.ip_addresses: 82 self._try_next_ip() 83 return 84 if self.disconnecting: return 85 86 log.info('Disconnecting NBClient: %s' % message) 87 88 sasl_failed = False 89 if 'NonBlockingRoster' in self.__dict__: 90 self.NonBlockingRoster.PlugOut() 91 if 'NonBlockingBind' in self.__dict__: 92 self.NonBlockingBind.PlugOut() 93 if 'NonBlockingNonSASL' in self.__dict__: 94 self.NonBlockingNonSASL.PlugOut() 95 if 'SASL' in self.__dict__: 96 if 'startsasl' in self.SASL.__dict__ and \ 97 self.SASL.startsasl == 'failure-in-process': 98 sasl_failed = True 99 self.SASL.startsasl = 'failure' 100 self._on_start_sasl() 101 else: 102 self.SASL.PlugOut() 103 if 'NonBlockingTCP' in self.__dict__: 104 self.NonBlockingTCP.PlugOut() 105 if 'NonBlockingHTTP' in self.__dict__: 106 self.NonBlockingHTTP.PlugOut() 107 if 'NonBlockingBOSH' in self.__dict__: 108 self.NonBlockingBOSH.PlugOut() 109 # FIXME: we never unplug dispatcher, only on next connect 110 # See _xmpp_connect_machine and SASLHandler 111 112 connected = self.connected 113 stream_started = self.stream_started 114 115 self.connected = '' 116 self.stream_started = False 117 118 self.disconnecting = True 119 120 log.debug('Client disconnected..') 121 # Don't call any callback when it's a SASL failure. 122 # SASL handler is already called 123 if connected == '' and not sasl_failed: 124 # if we're disconnecting before connection to XMPP sever is opened, 125 # we don't call disconnect handlers but on_connect_failure callback 126 if self.proxy: 127 # with proxy, we have different failure callback 128 log.debug('calling on_proxy_failure cb') 129 self.on_proxy_failure(reason=message) 130 else: 131 log.debug('calling on_connect_failure cb') 132 self.on_connect_failure() 133 elif not sasl_failed: 134 # we are connected to XMPP server 135 if not stream_started: 136 # if error occur before XML stream was opened, e.g. no response on 137 # init request, we call the on_connect_failure callback because 138 # proper connection is not established yet and it's not a proxy 139 # issue 140 log.debug('calling on_connect_failure cb') 141 self._caller.streamError = message 142 self.on_connect_failure() 143 else: 144 # with open connection, we are calling the disconnect handlers 145 for i in reversed(self.disconnect_handlers): 146 log.debug('Calling disconnect handler %s' % i) 147 i() 148 self.disconnecting = False
149
150 - def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, 151 on_proxy_failure=None, on_stream_error_cb=None, proxy=None, 152 secure_tuple=('plain', None, None)):
153 """ 154 Open XMPP connection (open XML streams in both directions) 155 156 :param on_connect: called after stream is successfully opened 157 :param on_connect_failure: called when error occures during connection 158 :param hostname: hostname of XMPP server from SRV request 159 :param port: port number of XMPP server 160 :param on_proxy_failure: called if error occurres during TCP connection to 161 proxy server or during proxy connecting process 162 :param proxy: dictionary with proxy data. It should contain at least 163 values for keys 'host' and 'port' - connection details for proxy serve 164 and optionally keys 'user' and 'pass' as proxy credentials 165 :param secure_tuple: tuple of (desired connection type, cacerts, mycerts) 166 connection type can be 'ssl' - TLS established after TCP connection, 167 'tls' - TLS established after negotiation with starttls, or 'plain'. 168 cacerts, mycerts - see tls_nb.NonBlockingTLS constructor for more 169 details 170 """ 171 self.on_connect = on_connect 172 self.on_connect_failure=on_connect_failure 173 self.on_proxy_failure = on_proxy_failure 174 self.on_stream_error_cb = on_stream_error_cb 175 self.desired_security, self.cacerts, self.mycerts = secure_tuple 176 self.Connection = None 177 self.Port = port 178 self.proxy = proxy 179 180 if hostname: 181 self.xmpp_hostname = hostname 182 else: 183 self.xmpp_hostname = self.Server 184 185 # We only check for SSL here as for TLS we will first have to start a 186 # PLAIN connection and negotiate TLS afterwards. 187 # establish_tls will instruct transport to start secure connection 188 # directly 189 establish_tls = self.desired_security == 'ssl' 190 certs = (self.cacerts, self.mycerts) 191 192 proxy_dict = {} 193 tcp_host = self.xmpp_hostname 194 tcp_port = self.Port 195 196 if proxy: 197 # with proxies, client connects to proxy instead of directly to 198 # XMPP server ((hostname, port)) 199 # tcp_host is hostname of machine used for socket connection 200 # (DNS request will be done for proxy or BOSH CM hostname) 201 tcp_host, tcp_port, proxy_user, proxy_pass = \ 202 transports_nb.get_proxy_data_from_dict(proxy) 203 204 if proxy['type'] == 'bosh': 205 # Setup BOSH transport 206 self.socket = bosh.NonBlockingBOSH.get_instance( 207 on_disconnect=self.disconnect, 208 raise_event=self.raise_event, 209 idlequeue=self.idlequeue, 210 estabilish_tls=establish_tls, 211 certs=certs, 212 proxy_creds=(proxy_user, proxy_pass), 213 xmpp_server=(self.xmpp_hostname, self.Port), 214 domain=self.Server, 215 bosh_dict=proxy) 216 self.protocol_type = 'BOSH' 217 self.wait_for_restart_response = \ 218 proxy['bosh_wait_for_restart_response'] 219 else: 220 # http proxy 221 proxy_dict['type'] = proxy['type'] 222 proxy_dict['xmpp_server'] = (self.xmpp_hostname, self.Port) 223 proxy_dict['credentials'] = (proxy_user, proxy_pass) 224 225 if not proxy or proxy['type'] != 'bosh': 226 # Setup ordinary TCP transport 227 self.socket = transports_nb.NonBlockingTCP.get_instance( 228 on_disconnect=self.disconnect, 229 raise_event=self.raise_event, 230 idlequeue=self.idlequeue, 231 estabilish_tls=establish_tls, 232 certs=certs, 233 proxy_dict=proxy_dict) 234 235 # plug transport into client as self.Connection 236 self.socket.PlugIn(self) 237 238 self._resolve_hostname( 239 hostname=tcp_host, 240 port=tcp_port, 241 on_success=self._try_next_ip)
242
243 - def _resolve_hostname(self, hostname, port, on_success):
244 """ 245 Wrapper for getaddinfo call 246 247 FIXME: getaddinfo blocks 248 """ 249 try: 250 self.ip_addresses = socket.getaddrinfo(hostname, port, 251 socket.AF_UNSPEC, socket.SOCK_STREAM) 252 except socket.gaierror, (errnum, errstr): 253 self.disconnect(message='Lookup failure for %s:%s, hostname: %s - %s' % 254 (self.Server, self.Port, hostname, errstr)) 255 except socket.error , (errnum, errstr): 256 # Catches an unexpected error with the socket 257 self.disconnect(message='General socket error for %s:%s, hostname: %s - %s' % 258 (self.Server, self.Port, hostname, errstr)) 259 else: 260 on_success()
261
262 - def _try_next_ip(self, err_message=None):
263 """ 264 Iterate over IP addresses tries to connect to it 265 """ 266 if err_message: 267 log.debug('While looping over DNS A records: %s' % err_message) 268 if self.ip_addresses == []: 269 msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port) 270 msg = msg + ' Error for last IP: %s' % err_message 271 self.disconnect(msg) 272 else: 273 self.current_ip = self.ip_addresses.pop(0) 274 self.socket.connect( 275 conn_5tuple=self.current_ip, 276 on_connect=lambda: self._xmpp_connect(), 277 on_connect_failure=self._try_next_ip)
278
279 - def incoming_stream_version(self):
280 """ 281 Get version of xml stream 282 """ 283 if 'version' in self.Dispatcher.Stream._document_attrs: 284 return self.Dispatcher.Stream._document_attrs['version'] 285 else: 286 return None
287
288 - def _xmpp_connect(self, socket_type=None):
289 """ 290 Start XMPP connecting process - open the XML stream. Is called after TCP 291 connection is established or after switch to TLS when successfully 292 negotiated with <starttls>. 293 """ 294 # socket_type contains info which transport connection was established 295 if not socket_type: 296 if self.Connection.ssl_lib: 297 # When ssl_lib is set we connected via SSL 298 socket_type = 'ssl' 299 else: 300 # PLAIN is default 301 socket_type = 'plain' 302 self.connected = socket_type 303 self._xmpp_connect_machine()
304
305 - def _xmpp_connect_machine(self, mode=None, data=None):
306 """ 307 Finite automaton taking care of stream opening and features tag handling. 308 Calls _on_stream_start when stream is started, and disconnect() on 309 failure. 310 """ 311 log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' % 312 (mode, str(data)[:20])) 313 314 def on_next_receive(mode): 315 """ 316 Set desired on_receive callback on transport based on the state of 317 connect_machine. 318 """ 319 log.info('setting %s on next receive' % mode) 320 if mode is None: 321 self.onreceive(None) # switch to Dispatcher.ProcessNonBlocking 322 else: 323 self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
324 325 if not mode: 326 # starting state 327 if self.__dict__.has_key('Dispatcher'): 328 self.Dispatcher.PlugOut() 329 self.got_features = False 330 dispatcher_nb.Dispatcher.get_instance().PlugIn(self) 331 on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') 332 333 elif mode == 'FAILURE': 334 self.disconnect('During XMPP connect: %s' % data) 335 336 elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES': 337 if data: 338 self.Dispatcher.ProcessNonBlocking(data) 339 self.ip_addresses = [] 340 if not hasattr(self, 'Dispatcher') or \ 341 self.Dispatcher.Stream._document_attrs is None: 342 self._xmpp_connect_machine( 343 mode='FAILURE', 344 data='Error on stream open') 345 return 346 347 # if terminating stanza was received after init request then client gets 348 # disconnected from bosh transport plugin and we have to end the stream 349 # negotiating process straight away. 350 # fixes #4657 351 if not self.connected: return 352 353 if self.incoming_stream_version() == '1.0': 354 if not self.got_features: 355 on_next_receive('RECEIVE_STREAM_FEATURES') 356 else: 357 log.info('got STREAM FEATURES in first recv') 358 self._xmpp_connect_machine(mode='STREAM_STARTED') 359 else: 360 log.info('incoming stream version less than 1.0') 361 self._xmpp_connect_machine(mode='STREAM_STARTED') 362 363 elif mode == 'RECEIVE_STREAM_FEATURES': 364 if data: 365 # sometimes <features> are received together with document 366 # attributes and sometimes on next receive... 367 self.Dispatcher.ProcessNonBlocking(data) 368 if self.got_see_other_host: 369 log.info('got see-other-host') 370 self.onreceive(None) 371 self.on_stream_error_cb(self, self.got_see_other_host) 372 elif not self.got_features: 373 self._xmpp_connect_machine(mode='FAILURE', 374 data='Missing <features> in 1.0 stream') 375 else: 376 log.info('got STREAM FEATURES in second recv') 377 self._xmpp_connect_machine(mode='STREAM_STARTED') 378 379 elif mode == 'STREAM_STARTED': 380 self._on_stream_start()
381
382 - def _on_stream_start(self):
383 """ 384 Called after XMPP stream is opened. TLS negotiation may follow if 385 supported and desired. 386 """ 387 self.stream_started = True 388 if not hasattr(self, 'onreceive'): 389 # we may already have been disconnected 390 return 391 self.onreceive(None) 392 393 if self.connected == 'plain': 394 if self.desired_security == 'plain': 395 # if we want and have plain connection, we're done now 396 self._on_connect() 397 else: 398 # try to negotiate TLS 399 if self.incoming_stream_version() != '1.0': 400 # if stream version is less than 1.0, we can't do more 401 log.info('While connecting with type = "tls": stream version ' + 402 'is less than 1.0') 403 self._on_connect() 404 return 405 if self.Dispatcher.Stream.features.getTag('starttls'): 406 # Server advertises TLS support, start negotiation 407 self.stream_started = False 408 log.info('TLS supported by remote server. Requesting TLS start.') 409 self._tls_negotiation_handler() 410 else: 411 log.info('While connecting with type = "tls": TLS unsupported ' + 412 'by remote server') 413 self._on_connect() 414 415 elif self.connected in ['ssl', 'tls']: 416 self._on_connect() 417 else: 418 assert False, 'Stream opened for unsupported connection'
419
420 - def _tls_negotiation_handler(self, con=None, tag=None):
421 """ 422 Take care of TLS negotioation with <starttls> 423 """ 424 log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag) 425 if not con and not tag: 426 # starting state when we send the <starttls> 427 self.RegisterHandlerOnce('proceed', self._tls_negotiation_handler, 428 xmlns=NS_TLS) 429 self.RegisterHandlerOnce('failure', self._tls_negotiation_handler, 430 xmlns=NS_TLS) 431 self.send('<starttls xmlns="%s"/>' % NS_TLS) 432 else: 433 # we got <proceed> or <failure> 434 if tag.getNamespace() != NS_TLS: 435 self.disconnect('Unknown namespace: %s' % tag.getNamespace()) 436 return 437 tagname = tag.getName() 438 if tagname == 'failure': 439 self.disconnect('TLS <failure> received: %s' % tag) 440 return 441 log.info('Got starttls proceed response. Switching to TLS/SSL...') 442 # following call wouldn't work for BOSH transport but it doesn't matter 443 # because <starttls> negotiation with BOSH is forbidden 444 self.Connection.tls_init( 445 on_succ = lambda: self._xmpp_connect(socket_type='tls'), 446 on_fail = lambda: self.disconnect('error while etabilishing TLS'))
447
448 - def _on_connect(self):
449 """ 450 Preceed call of on_connect callback 451 """ 452 self.onreceive(None) 453 self.on_connect(self, self.connected)
454
455 - def raise_event(self, event_type, data):
456 """ 457 Raise event to connection instance. DATA_SENT and DATA_RECIVED events 458 are used in XML console to show XMPP traffic 459 """ 460 log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (event_type, data)) 461 if hasattr(self, 'Dispatcher'): 462 self.Dispatcher.Event('', event_type, data)
463 464 ############################################################################### 465 ### follows code for authentication, resource bind, session and roster download 466 ############################################################################### 467
468 - def auth(self, user, password, resource='', sasl=True, on_auth=None):
469 """ 470 Authenticate connnection and bind resource. If resource is not provided 471 random one or library name used 472 473 :param user: XMPP username 474 :param password: XMPP password 475 :param resource: resource that shall be used for auth/connecting 476 :param sasl: Boolean indicating if SASL shall be used. (default: True) 477 :param on_auth: Callback, called after auth. On auth failure, argument 478 is None. 479 """ 480 self._User, self._Password = user, password 481 self._Resource, self._sasl = resource, sasl 482 self.on_auth = on_auth 483 self._on_doc_attrs() 484 return
485
486 - def _on_old_auth(self, res):
487 """ 488 Callback used by NON-SASL auth. On auth failure, res is None 489 """ 490 if res: 491 self.connected += '+old_auth' 492 self.on_auth(self, 'old_auth') 493 else: 494 self.on_auth(self, None)
495
496 - def _on_sasl_auth(self, res):
497 """ 498 Used internally. On auth failure, res is None 499 """ 500 self.onreceive(None) 501 if res: 502 self.connected += '+sasl' 503 self.on_auth(self, 'sasl') 504 else: 505 self.on_auth(self, None)
506
507 - def _on_doc_attrs(self):
508 """ 509 Plug authentication objects and start auth 510 """ 511 if self._sasl: 512 auth_nb.SASL.get_instance(self._User, self._Password, 513 self._on_start_sasl).PlugIn(self) 514 if not hasattr(self, 'SASL'): 515 return 516 if not self._sasl or self.SASL.startsasl == 'not-supported': 517 if not self._Resource: 518 self._Resource = 'xmpppy' 519 auth_nb.NonBlockingNonSASL.get_instance(self._User, self._Password, 520 self._Resource, self._on_old_auth).PlugIn(self) 521 return 522 self.SASL.auth() 523 return True
524
525 - def _on_start_sasl(self, data=None):
526 """ 527 Callback used by SASL, called on each auth step 528 """ 529 if data: 530 self.Dispatcher.ProcessNonBlocking(data) 531 if not 'SASL' in self.__dict__: 532 # SASL is pluged out, possible disconnect 533 return 534 if self.SASL.startsasl == 'in-process': 535 return 536 self.onreceive(None) 537 if self.SASL.startsasl == 'failure': 538 # wrong user/pass, stop auth 539 if 'SASL' in self.__dict__: 540 self.SASL.PlugOut() 541 self.connected = None # FIXME: is this intended? We use ''elsewhere 542 self._on_sasl_auth(None) 543 elif self.SASL.startsasl == 'success': 544 nb_bind = auth_nb.NonBlockingBind.get_instance() 545 sm = self._caller.sm 546 if sm._owner and sm.resumption: 547 nb_bind.resuming = True 548 sm.set_owner(self) 549 self.Dispatcher.sm = sm 550 nb_bind.PlugIn(self) 551 self.on_auth(self, 'sasl') 552 return 553 554 nb_bind.PlugIn(self) 555 self.onreceive(self._on_auth_bind) 556 return True
557
558 - def _on_auth_bind(self, data):
559 # FIXME: Why use this callback and not bind directly? 560 if data: 561 self.Dispatcher.ProcessNonBlocking(data) 562 if self.NonBlockingBind.bound is None: 563 return 564 self.NonBlockingBind.NonBlockingBind(self._Resource, self._on_sasl_auth) 565 return True
566
567 - def initRoster(self, version=''):
568 """ 569 Plug in the roster 570 """ 571 if not self.__dict__.has_key('NonBlockingRoster'): 572 return roster_nb.NonBlockingRoster.get_instance(version=version).PlugIn(self)
573
574 - def getRoster(self, on_ready=None, force=False):
575 """ 576 Return the Roster instance, previously plugging it in and requesting 577 roster from server if needed 578 """ 579 if self.__dict__.has_key('NonBlockingRoster'): 580 return self.NonBlockingRoster.getRoster(on_ready, force) 581 return None
582
583 - def sendPresence(self, jid=None, typ=None, requestRoster=0):
584 """ 585 Send some specific presence state. Can also request roster from server if 586 according agrument is set 587 """ 588 if requestRoster: 589 # FIXME: used somewhere? 590 roster_nb.NonBlockingRoster.get_instance().PlugIn(self) 591 self.send(dispatcher_nb.Presence(to=jid, typ=typ))
592 593 ############################################################################### 594 ### following methods are moved from blocking client class of xmpppy 595 ############################################################################### 596
597 - def RegisterDisconnectHandler(self, handler):
598 """ 599 Register handler that will be called on disconnect 600 """ 601 self.disconnect_handlers.append(handler)
602
603 - def UnregisterDisconnectHandler(self, handler):
604 """ 605 Unregister handler that is called on disconnect 606 """ 607 self.disconnect_handlers.remove(handler)
608
609 - def DisconnectHandler(self):
610 """ 611 Default disconnect handler. Just raises an IOError. If you choosed to use 612 this class in your production client, override this method or at least 613 unregister it. 614 """ 615 raise IOError('Disconnected from server.')
616
617 - def get_connect_type(self):
618 """ 619 Return connection state. F.e.: None / 'tls' / 'plain+non_sasl' 620 """ 621 return self.connected
622
623 - def get_peerhost(self):
624 """ 625 Gets the ip address of the account, from which is made connection to the 626 server (e.g. IP and port of socket) 627 628 We will create listening socket on the same ip 629 """ 630 # FIXME: tuple (ip, port) is expected (and checked for) but port num is 631 # useless 632 return self.socket.peerhost
633