1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """\
21 X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and
22 X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal
23 server sessions.
24
25 """
26 __NAME__ = 'x2gorevtunnel-pylib'
27
28
29 import copy
30 import threading
31 import gevent
32 import paramiko
33
34
35 from gevent import select, socket, Timeout
36
37
38 import log
39
40
42 """\
43 An X2Go customized TCP handler for the Paramiko/SSH C{Transport()} class.
44
45 Incoming channels will be put into Paramiko's default accept queue. This corresponds to
46 the default behaviour of Paramiko's C{Transport} class.
47
48 However, additionally this handler function checks the server port of the incoming channel
49 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming
50 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2GoSession} instance
51 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests).
52
53 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2GoRevFwTunnel}
54 instance, this instance gets notified of the incoming channel and a new L{X2GoRevFwChannelThread} is
55 started. This L{X2GoRevFwChannelThread} then takes care of the new channel's incoming data stream.
56
57 """
58 transport = chan.get_transport()
59 transport._queue_incoming_channel(chan)
60 rev_tuns = transport.reverse_tunnels
61
62 for session_name in rev_tuns.keys():
63
64 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]:
65
66 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]):
67 rev_tuns[session_name]['snd'][1].notify()
68
69 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]):
70 rev_tuns[session_name]['sshfs'][1].notify()
71
72
74 """\
75 L{X2GoRevFwTunnel} class objects are used to reversely tunnel
76 X2Go audio, X2Go printing and X2Go folder sharing / device mounting
77 through Paramiko/SSH.
78
79 """
80 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
81 """\
82 Setup a reverse tunnel through Paramiko/SSH.
83
84 After the reverse tunnel has been setup up with L{X2GoRevFwTunnel.start()} it waits
85 for notification from L{X2GoRevFwTunnel.notify()} to accept incoming channels. This
86 notification (L{X2GoRevFwTunnel.notify()} gets called from within the transport's
87 TCP handler function L{x2go_transport_tcp_handler} of the L{X2GoSession} instance.
88
89 @param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel),
90 normally some number above 30000
91 @type server_port: int
92 @param remote_host: the target address for reversely tunneled traffic. With X2Go this should
93 always be set to the localhost (IPv4) address.
94 @type remote_host: str
95 @param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel),
96 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.)
97 @type remote_port: int
98 @param ssh_transport: the L{X2GoSession}'s Paramiko/SSH transport instance
99 @type ssh_transport: C{paramiko.Transport} instance
100 @param logger: you can pass an L{X2GoLogger} object to the
101 L{X2GoRevFwTunnel} constructor
102 @type logger: L{X2GoLogger} instance
103 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
104 constructed with the given loglevel
105 @type loglevel: int
106
107 """
108 if logger is None:
109 self.logger = log.X2GoLogger(loglevel=loglevel)
110 else:
111 self.logger = copy.deepcopy(logger)
112 self.logger.tag = __NAME__
113
114 self.server_port = server_port
115 self.remote_host = remote_host
116 self.remote_port = remote_port
117 self.ssh_transport = ssh_transport
118 self.session_instance = session_instance
119
120 self.open_channels = {}
121 self.incoming_channel = threading.Condition()
122
123 threading.Thread.__init__(self)
124 self.daemon = True
125 self._accept_channels = True
126
134
136 """\
137 Cancel a port forwarding request. This cancellation request is sent to the server and
138 on the server the port forwarding should be unregistered.
139
140 @param address: remote server address
141 @type address: C{str}
142 @param port: remote port
143 @type port: C{int}
144
145 """
146 timeout = Timeout(10)
147 timeout.start()
148 try:
149 self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True)
150 except:
151 pass
152 finally:
153 timeout.cancel()
154
156 """\
157 Prevent acceptance of new incoming connections through the Paramiko/SSH
158 reverse forwarding tunnel. Also, any active connection on this L{X2GoRevFwTunnel}
159 instance will be closed immediately, if this method is called.
160
161 """
162 if self._accept_channels == True:
163 self.cancel_port_forward('', self.server_port)
164 self._accept_channels = False
165 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
166
168 """\
169 Resume operation of the Paramiko/SSH reverse forwarding tunnel
170 and continue accepting new incoming connections.
171
172 """
173 if self._accept_channels == False:
174 self._accept_channels = True
175 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
176 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
177
179 """\
180 Notify an L{X2GoRevFwTunnel} instance of an incoming Paramiko/SSH channel.
181
182 If an incoming reverse tunnel channel appropriate for this instance has
183 been detected, this method gets called from the L{X2GoSession}'s transport
184 TCP handler.
185
186 The sent notification will trigger a C{thread.Condition()} waiting for notification
187 in L{X2GoRevFwTunnel.run()}.
188
189 """
190 self.incoming_channel.acquire()
191 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG)
192 self.incoming_channel.notify()
193 self.incoming_channel.release()
194
196 """\
197 Stops this L{X2GoRevFwTunnel} thread completely.
198
199 """
200 self.pause()
201 self._keepalive = False
202 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
203 self.notify()
204
206 try:
207 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
208 except paramiko.SSHException:
209
210
211 self.cancel_port_forward('', self.server_port)
212 gevent.sleep(1)
213 try:
214 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
215 except paramiko.SSHException, e:
216 if self.session_instance:
217 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port)
218 else:
219 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
220
222 """\
223 This method gets run once an L{X2GoRevFwTunnel} has been started with its
224 L{start()} method. Use L{X2GoRevFwTunnel}.stop_thread() to stop the
225 reverse forwarding tunnel again. You can also temporarily lock the tunnel
226 down with L{X2GoRevFwTunnel.pause()} and L{X2GoRevFwTunnel.resume()}).
227
228 L{X2GoRevFwTunnel.run()} waits for notifications of an appropriate incoming
229 Paramiko/SSH channel (issued by L{X2GoRevFwTunnel.notify()}). Appropriate in
230 this context means, that its start point on the X2Go server matches the class's
231 property C{server_port}.
232
233 Once a new incoming channel gets announced by the L{notify()} method, a new
234 L{X2GoRevFwChannelThread} instance will be initialized. As a data stream handler,
235 the function L{x2go_rev_forward_channel_handler()} will be used.
236
237 The channel will last till the connection gets dropped on the X2Go server side or
238 until the tunnel gets paused by an L{X2GoRevFwTunnel.pause()} call or stopped via the
239 L{X2GoRevFwTunnel.stop_thread()} method.
240
241 """
242 self._request_port_forwarding()
243 self._keepalive = True
244 while self._keepalive:
245
246 self.incoming_channel.acquire()
247
248 self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
249 self.incoming_channel.wait()
250
251 if self._keepalive:
252 self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
253 _chan = self.ssh_transport.accept()
254 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG)
255 else:
256 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
257
258 self.incoming_channel.release()
259 if self._accept_channels and self._keepalive:
260 _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port),
261 target=x2go_rev_forward_channel_handler,
262 kwargs={
263 'chan': _chan,
264 'addr': self.remote_host,
265 'port': self.remote_port,
266 'parent_thread': self,
267 'logger': self.logger,
268 }
269 )
270 _new_chan_thread.start()
271 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
272
273
275 """\
276 Handle the data stream of a requested channel that got set up by a L{X2GoRevFwTunnel} (Paramiko/SSH
277 reverse forwarding tunnel).
278
279 The channel (and the corresponding connections) close either ...
280
281 - ... if the connecting application closes the connection and thus, drops
282 the channel, or
283 - ... if the L{X2GoRevFwTunnel} parent thread gets paused. The call
284 of L{X2GoRevFwTunnel.pause()} on the instance can be used to shut down all incoming
285 tunneled SSH connections associated to this L{X2GoRevFwTunnel} instance
286 from within a Python X2Go application.
287
288 @param chan: channel
289 @type chan: C{class}
290 @param addr: bind address
291 @type addr: C{str}
292 @param port: bind port
293 @type port: C{int}
294 @param parent_thread: the calling L{X2GoRevFwTunnel} instance
295 @type parent_thread: L{X2GoRevFwTunnel} instance
296 @param logger: you can pass an L{X2GoLogger} object to the
297 L{X2GoRevFwTunnel} constructor
298 @type logger: L{X2GoLogger} instance
299
300 """
301 fw_socket = socket.socket()
302 fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
303 if logger is None:
304 def _dummy_logger(msg, l):
305 pass
306 logger = _dummy_logger
307
308 try:
309 fw_socket.connect((addr, port))
310 except Exception, e:
311 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO)
312 return
313
314 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr,
315 chan.getpeername(), (addr, port)),
316 loglevel=log.loglevel_INFO)
317 while parent_thread._accept_channels:
318 r, w, x = select.select([fw_socket, chan], [], [])
319 if fw_socket in r:
320 data = fw_socket.recv(1024)
321 if len(data) == 0:
322 break
323 chan.send(data)
324 if chan in r:
325 data = chan.recv(1024)
326 if len(data) == 0:
327 break
328 fw_socket.send(data)
329
330 chan.close()
331 fw_socket.close()
332 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO)
333
334
336 """\
337 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse
338 forwarding tunnel.
339
340 """
341 - def __init__(self, channel, remote=None, **kwargs):
342 """\
343 Initializes a reverse forwarding channel thread.
344
345 @param channel: incoming Paramiko/SSH channel from the L{X2GoSession}'s transport
346 accept queue
347 @type channel: class
348 @param remote: tuple (addr, port) that specifies the data endpoint of the channel
349 @type remote: C{tuple(str, int)}
350
351 """
352 self.channel = channel
353 if remote is not None:
354 self.remote_host = remote[0]
355 self.remote_port = remote[1]
356 threading.Thread.__init__(self, **kwargs)
357 self.daemon = True
358