1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """\
19 X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and
20 X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal
21 server sessions.
22
23 """
24 __NAME__ = 'x2gorevtunnel-pylib'
25
26
27 import copy
28 import threading
29 import gevent
30 import paramiko
31
32
33 from gevent import select, socket, Timeout
34
35
36 import log
37
38
40 """\
41 An X2Go customized TCP handler for the Paramiko/SSH C{Transport()} class.
42
43 Incoming channels will be put into Paramiko's default accept queue. This corresponds to
44 the default behaviour of Paramiko's C{Transport} class.
45
46 However, additionally this handler function checks the server port of the incoming channel
47 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming
48 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2GoSession} instance
49 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests).
50
51 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2GoRevFwTunnel}
52 instance, this instance gets notified of the incoming channel and a new L{X2GoRevFwChannelThread} is
53 started. This L{X2GoRevFwChannelThread} then takes care of the new channel's incoming data stream.
54
55 """
56 transport = chan.get_transport()
57 transport._queue_incoming_channel(chan)
58 rev_tuns = transport.reverse_tunnels
59
60 for session_name in rev_tuns.keys():
61
62 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]:
63
64 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]):
65 rev_tuns[session_name]['snd'][1].notify()
66
67 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]):
68 rev_tuns[session_name]['sshfs'][1].notify()
69
70
72 """\
73 L{X2GoRevFwTunnel} class objects are used to reversely tunnel
74 X2Go audio, X2Go printing and X2Go folder sharing / device mounting
75 through Paramiko/SSH.
76
77 """
78 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
79 """\
80 Setup a reverse tunnel through Paramiko/SSH.
81
82 After the reverse tunnel has been setup up with L{X2GoRevFwTunnel.start()} it waits
83 for notification from L{X2GoRevFwTunnel.notify()} to accept incoming channels. This
84 notification (L{X2GoRevFwTunnel.notify()} gets called from within the transport's
85 TCP handler function L{x2go_transport_tcp_handler} of the L{X2GoSession} instance.
86
87 @param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel),
88 normally some number above 30000
89 @type server_port: int
90 @param remote_host: the target address for reversely tunneled traffic. With X2Go this should
91 always be set to the localhost (IPv4) address.
92 @type remote_host: str
93 @param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel),
94 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.)
95 @type remote_port: int
96 @param ssh_transport: the L{X2GoSession}'s Paramiko/SSH transport instance
97 @type ssh_transport: C{paramiko.Transport} instance
98 @param logger: you can pass an L{X2GoLogger} object to the
99 L{X2GoRevFwTunnel} constructor
100 @type logger: L{X2GoLogger} instance
101 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
102 constructed with the given loglevel
103 @type loglevel: int
104
105 """
106 if logger is None:
107 self.logger = log.X2GoLogger(loglevel=loglevel)
108 else:
109 self.logger = copy.deepcopy(logger)
110 self.logger.tag = __NAME__
111
112 self.server_port = server_port
113 self.remote_host = remote_host
114 self.remote_port = remote_port
115 self.ssh_transport = ssh_transport
116 self.session_instance = session_instance
117
118 self.open_channels = {}
119 self.incoming_channel = threading.Condition()
120
121 threading.Thread.__init__(self)
122 self.daemon = True
123 self._accept_channels = True
124
132
134 """\
135 Cancel a port forwarding request. This cancellation request is sent to the server and
136 on the server the port forwarding should be unregistered.
137
138 @param address: remote server address
139 @type address: C{str}
140 @param port: remote port
141 @type port: C{int}
142
143 """
144 timeout = Timeout(10)
145 timeout.start()
146 try:
147 self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True)
148 except:
149 pass
150 finally:
151 timeout.cancel()
152
154 """\
155 Prevent acceptance of new incoming connections through the Paramiko/SSH
156 reverse forwarding tunnel. Also, any active connection on this L{X2GoRevFwTunnel}
157 instance will be closed immediately, if this method is called.
158
159 """
160 if self._accept_channels == True:
161 self.cancel_port_forward('', self.server_port)
162 self._accept_channels = False
163 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
164
166 """\
167 Resume operation of the Paramiko/SSH reverse forwarding tunnel
168 and continue accepting new incoming connections.
169
170 """
171 if self._accept_channels == False:
172 self._accept_channels = True
173 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
174 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
175
177 """\
178 Notify an L{X2GoRevFwTunnel} instance of an incoming Paramiko/SSH channel.
179
180 If an incoming reverse tunnel channel appropriate for this instance has
181 been detected, this method gets called from the L{X2GoSession}'s transport
182 TCP handler.
183
184 The sent notification will trigger a C{thread.Condition()} waiting for notification
185 in L{X2GoRevFwTunnel.run()}.
186
187 """
188 self.incoming_channel.acquire()
189 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG)
190 self.incoming_channel.notify()
191 self.incoming_channel.release()
192
194 """\
195 Stops this L{X2GoRevFwTunnel} thread completely.
196
197 """
198 self.pause()
199 self._keepalive = False
200 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
201 self.notify()
202
204 try:
205 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
206 except paramiko.SSHException:
207
208
209 self.cancel_port_forward('', self.server_port)
210 gevent.sleep(1)
211 try:
212 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
213 except paramiko.SSHException, e:
214 if self.session_instance:
215 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port)
216 else:
217 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
218
220 """\
221 This method gets run once an L{X2GoRevFwTunnel} has been started with its
222 L{start()} method. Use L{X2GoRevFwTunnel}.stop_thread() to stop the
223 reverse forwarding tunnel again. You can also temporarily lock the tunnel
224 down with L{X2GoRevFwTunnel.pause()} and L{X2GoRevFwTunnel.resume()}).
225
226 L{X2GoRevFwTunnel.run()} waits for notifications of an appropriate incoming
227 Paramiko/SSH channel (issued by L{X2GoRevFwTunnel.notify()}). Appropriate in
228 this context means, that its start point on the X2Go server matches the class's
229 property C{server_port}.
230
231 Once a new incoming channel gets announced by the L{notify()} method, a new
232 L{X2GoRevFwChannelThread} instance will be initialized. As a data stream handler,
233 the function L{x2go_rev_forward_channel_handler()} will be used.
234
235 The channel will last till the connection gets dropped on the X2Go server side or
236 until the tunnel gets paused by an L{X2GoRevFwTunnel.pause()} call or stopped via the
237 L{X2GoRevFwTunnel.stop_thread()} method.
238
239 """
240 self._request_port_forwarding()
241 self._keepalive = True
242 while self._keepalive:
243
244 self.incoming_channel.acquire()
245
246 self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
247 self.incoming_channel.wait()
248
249 if self._keepalive:
250 self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
251 _chan = self.ssh_transport.accept()
252 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG)
253 else:
254 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
255
256 self.incoming_channel.release()
257 if self._accept_channels and self._keepalive:
258 _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port),
259 target=x2go_rev_forward_channel_handler,
260 kwargs={
261 'chan': _chan,
262 'addr': self.remote_host,
263 'port': self.remote_port,
264 'parent_thread': self,
265 'logger': self.logger,
266 }
267 )
268 _new_chan_thread.start()
269 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
270
271
273 """\
274 Handle the data stream of a requested channel that got set up by a L{X2GoRevFwTunnel} (Paramiko/SSH
275 reverse forwarding tunnel).
276
277 The channel (and the corresponding connections) close either ...
278
279 - ... if the connecting application closes the connection and thus, drops
280 the channel, or
281 - ... if the L{X2GoRevFwTunnel} parent thread gets paused. The call
282 of L{X2GoRevFwTunnel.pause()} on the instance can be used to shut down all incoming
283 tunneled SSH connections associated to this L{X2GoRevFwTunnel} instance
284 from within a Python X2Go application.
285
286 @param chan: channel
287 @type chan: C{class}
288 @param addr: bind address
289 @type addr: C{str}
290 @param port: bind port
291 @type port: C{int}
292 @param parent_thread: the calling L{X2GoRevFwTunnel} instance
293 @type parent_thread: L{X2GoRevFwTunnel} instance
294 @param logger: you can pass an L{X2GoLogger} object to the
295 L{X2GoRevFwTunnel} constructor
296 @type logger: L{X2GoLogger} instance
297
298 """
299 fw_socket = socket.socket()
300 fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
301 if logger is None:
302 def _dummy_logger(msg, l):
303 pass
304 logger = _dummy_logger
305
306 try:
307 fw_socket.connect((addr, port))
308 except Exception, e:
309 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO)
310 return
311
312 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr,
313 chan.getpeername(), (addr, port)),
314 loglevel=log.loglevel_INFO)
315 while parent_thread._accept_channels:
316 r, w, x = select.select([fw_socket, chan], [], [])
317 if fw_socket in r:
318 data = fw_socket.recv(1024)
319 if len(data) == 0:
320 break
321 chan.send(data)
322 if chan in r:
323 data = chan.recv(1024)
324 if len(data) == 0:
325 break
326 fw_socket.send(data)
327
328 chan.close()
329 fw_socket.close()
330 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO)
331
332
334 """\
335 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse
336 forwarding tunnel.
337
338 """
339 - def __init__(self, channel, remote=None, **kwargs):
340 """\
341 Initializes a reverse forwarding channel thread.
342
343 @param channel: incoming Paramiko/SSH channel from the L{X2GoSession}'s transport
344 accept queue
345 @type channel: class
346 @param remote: tuple (addr, port) that specifies the data endpoint of the channel
347 @type remote: C{tuple(str, int)}
348
349 """
350 self.channel = channel
351 if remote is not None:
352 self.remote_host = remote[0]
353 self.remote_port = remote[1]
354 threading.Thread.__init__(self, **kwargs)
355 self.daemon = True
356