# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# This file is part of the X2Go Project - https://www.x2go.org
# Copyright (C) 2012-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
# modules
import tornado.web
from tornado.escape import native_str, parse_qs_bytes
# Python X2Go Broker modules
import x2gobroker.defaults
from x2gobroker.loggers import logger_broker, logger_error
class _RequestHandler(tornado.web.RequestHandler):
def _handle_request_exception(self, e):
logger_error.error('HTTP request error: {error_msg}'.format(error_msg=e))
tornado.web.RequestHandler._handle_request_exception(self, e)
[docs]class X2GoBrokerWeb(_RequestHandler):
"""\
HTTP request handler that provides the plain text web frontend of the
X2Go Session Broker.
Currently, X2Go Client uses this webfrontend / communication protocol format.
:raises tornado.web.HTTPError: on authentication failure a 401 error is raised
"""
http_header_items = {
'Content-Type': 'text/plain; charset=utf-8',
'Expires': '+1h',
}
def _gen_http_header(self):
for http_header_item in list(self.http_header_items.keys()):
self.set_header(http_header_item, self.http_header_items[http_header_item])
[docs] def get(self, path):
"""\
Implementation of the plain text broker communication protocol as
used by X2Go Client (via POST requests).
In debug mode you can test the broker's functionality using a
normal web browser via GET requests.
:param path: URL path
:type path: ``str``
"""
if x2gobroker.defaults.X2GOBROKER_DEBUG:
logger_broker.warn('GET http request detected, if unwanted: disable X2GOBROKER_DEBUG')
return self.post(path)
raise tornado.web.HTTPError(405)
[docs] def post(self, path):
"""\
Implementation of the plain text broker communication protocol as
used by X2Go Client (via POST requests).
:param path: URL path
:type path: ``str``
"""
self._gen_http_header()
if not path:
backend = x2gobroker.defaults.X2GOBROKER_DEFAULT_BACKEND
else:
backend = path.rstrip('/')
if '/' in backend:
backend = backend.split('/')[0]
# silence pyflakes...
broker_backend = None
try:
namespace = {}
# dynamically detect broker backend from given URL
exec("import x2gobroker.brokers.{backend}_broker\nbroker_backend = x2gobroker.brokers.{backend}_broker.X2GoBroker()".format(backend=backend), namespace)
broker_backend = namespace['broker_backend']
except ImportError:
# throw a 404 if the backend does not exist
logger_error.error('No such broker backend \'{backend}\''.format(backend=backend))
raise tornado.web.HTTPError(404)
global_config = broker_backend.get_global_config()
# throw a 404 if the WebUI is not enabled
if not global_config['enable-plain-output']:
logger_error.error('The WebUI \'plain\' is not enabled in the global broker configuration')
raise tornado.web.HTTPError(404)
# if the broker backend is disabled in the configuration, pretend to have nothing on offer
if not broker_backend.is_enabled():
logger_error.error('The broker backend \'{backend}\' is not enabled'.format(backend=broker_backend.get_name()))
raise tornado.web.HTTPError(404)
# FIXME: this is to work around a bug in X2Go Client (https://bugs.x2go.org/138)
content_type = self.request.headers.get("Content-Type", "")
if not content_type.startswith("application/x-www-form-urlencoded"):
for name, values in parse_qs_bytes(native_str(self.request.body)).items():
self.request.arguments.setdefault(name, []).extend(values)
# set the client address for the broker backend
ip = self.request.remote_ip
if ip:
logger_broker.info('client address is {address}'.format(address=ip))
broker_backend.set_client_address(ip)
elif not x2gobroker.defaults.X2GOBROKER_DEBUG:
# if the client IP is not set, we pretend to have nothing on offer
logger_error.error('client could not provide an IP address, pretending: 404 Not Found')
raise tornado.web.HTTPError(404)
broker_username = self.get_argument('user', default='')
server_username = self.get_argument('login', default='')
if not server_username:
server_username = broker_username
password = self.get_argument('password', default='', strip=False)
cookie = self.get_argument('authid', default='')
pubkey = self.get_argument('pubkey', default='')
task = self.get_argument('task', default='')
profile_id = self.get_argument('sid', default='')
#new_password = self.get_argument('newpass', default='')
output = ''
broker_username, password, task, profile_id, ip, cookie, authed, server = broker_backend.run_optional_script(script_type='pre_auth_scripts', username=broker_username, password=password, task=task, profile_id=profile_id, ip=ip, cookie=cookie)
logger_broker.debug ('broker_username: {broker_username}, server_username: {server_username}, password: {password}, task: {task}, profile_id: {profile_id}, cookie: {cookie}'.format(broker_username=broker_username, server_username=server_username, password='XXXXX', task=task, profile_id=profile_id, cookie=cookie))
access, next_cookie = broker_backend.check_access(username=broker_username, password=password, ip=ip, cookie=cookie)
broker_username, password, task, profile_id, ip, cookie, authed, server = broker_backend.run_optional_script(script_type='post_auth_scripts', username=broker_username, password=password, task=task, profile_id=profile_id, ip=ip, cookie=cookie, authed=access)
if access:
###
### CONFIRM SUCCESSFUL AUTHENTICATION FIRST
###
if next_cookie is not None:
output += "AUTHID:{authid}\n".format(authid=next_cookie)
output += "Access granted\n"
###
### X2GO BROKER TASKS
###
# FIXME: the ,,testcon'' task can be object to DoS attacks...
if task == 'testcon':
###
### TEST THE CONNECTION
###
self.write(broker_backend.test_connection())
return
if task == 'listsessions':
profiles = broker_backend.list_profiles(broker_username)
if profiles:
output += "START_USER_SESSIONS\n\n"
profile_ids = list(profiles.keys())
profile_ids.sort()
for profile_id in profile_ids:
output += "[{profile_id}]\n".format(profile_id=profile_id)
for key in list(profiles[profile_id].keys()):
if type(profiles[profile_id][key]) == str:
output += "{key}={value}".format(key=key, value=profiles[profile_id][key])
elif type(profiles[profile_id][key]) in (list, tuple):
output += "{key}={value}".format(key=key, value=",".join(profiles[profile_id][key]))
else:
output += "{key}={value}".format(key=key, value=int(profiles[profile_id][key]))
output += "\n"
output += "\n"
output += "END_USER_SESSIONS\n"
elif task == 'selectsession':
if profile_id:
profile_info = broker_backend.select_session(profile_id=profile_id, username=server_username, pubkey=pubkey)
if 'server' in profile_info:
server_username, password, task, profile_id, ip, cookie, authed, server = broker_backend.run_optional_script(script_type='select_session_scripts', username=server_username, password=password, task=task, profile_id=profile_id, ip=ip, cookie=cookie, authed=access, server=profile_info['server'])
output += "SERVER:"
output += server
if 'port' in profile_info:
output += ":{port}".format(port=profile_info['port'])
output += "\n"
if 'authentication_privkey' in profile_info:
output += profile_info['authentication_privkey']
if 'session_info' in profile_info:
output += "SESSION_INFO:"
output += profile_info['session_info'] + "\n"
self.write(output)
return
raise tornado.web.HTTPError(401)