# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
#
# Autopilot Functional Test Tool
# Copyright (C) 2012-2013 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Package for introspection support.
This package contains the internal implementation of the autopilot
introspection mechanism, and probably isn't useful to most test authors.
"""
from __future__ import absolute_import
from dbus import DBusException, Interface
import logging
import subprocess
from time import sleep
from functools import partial
import os
import psutil
from autopilot.introspection.backends import DBusAddress
from autopilot.introspection.constants import (
AUTOPILOT_PATH,
QT_AUTOPILOT_IFACE,
AP_INTROSPECTION_IFACE,
)
from autopilot.introspection.dbus import (
CustomEmulatorBase,
DBusIntrospectionObject,
get_classname_from_path,
)
from autopilot.introspection.utilities import (
_get_bus_connections_pid,
_pid_is_running,
)
from autopilot.dbus_handler import (
get_session_bus,
get_system_bus,
get_custom_bus,
)
logger = logging.getLogger(__name__)
# Keep track of known connections during search
connection_list = []
class ProcessSearchError(RuntimeError):
pass
def get_application_launcher(app_path):
"""Return an instance of :class:`ApplicationLauncher` that knows how to
launch the application at 'app_path'.
"""
# TODO: this is a teeny bit hacky - we call ldd to check whether this
# application links to certain library. We're assuming that linking to
# libQt* or libGtk* means the application is introspectable. This excludes
# any non-dynamically linked executables, which we may need to fix further
# down the line.
try:
ldd_output = subprocess.check_output(["ldd", app_path]).strip().lower()
except subprocess.CalledProcessError as e:
raise RuntimeError(e)
if 'libqtcore' in ldd_output or 'libqt5core' in ldd_output:
from autopilot.introspection.qt import QtApplicationLauncher
return QtApplicationLauncher()
elif 'libgtk' in ldd_output:
from autopilot.introspection.gtk import GtkApplicationLauncher
return GtkApplicationLauncher()
return None
def get_application_launcher_from_string_hint(hint):
"""Return in instance of :class:`ApplicationLauncher` given a string
hint."""
from autopilot.introspection.qt import QtApplicationLauncher
from autopilot.introspection.gtk import GtkApplicationLauncher
hint = hint.lower()
if hint == 'qt':
return QtApplicationLauncher()
elif hint == 'gtk':
return GtkApplicationLauncher()
return None
def launch_application(launcher, application, *arguments, **kwargs):
"""Launch an application, and return a process object.
:param launcher: An instance of the :class:`ApplicationLauncher` class to
prepare the environment before launching the application itself.
"""
if not isinstance(application, basestring):
raise TypeError("'application' parameter must be a string.")
cwd = kwargs.pop('launch_dir', None)
capture_output = kwargs.pop('capture_output', True)
if kwargs:
raise ValueError(
"Unknown keyword arguments: %s." %
(', '.join(repr(k) for k in kwargs.keys())))
path, args = launcher.prepare_environment(application, list(arguments))
process = launch_process(
path,
args,
capture_output,
cwd=cwd
)
return process
class ApplicationLauncher(object):
"""A class that knows how to launch an application with a certain type of
introspection enabled.
"""
def prepare_environment(self, app_path, arguments):
"""Prepare the application, or environment to launch with
autopilot-support.
This method does nothing - it exists so child classes can override it.
The method *must* return a tuple of (*app_path*, *arguments*). Either
of these can be altered by this method.
"""
raise NotImplementedError("Sub-classes must implement this method.")
def launch_process(application, args, capture_output, **kwargs):
"""Launch an autopilot-enabled process and return the process object."""
commandline = [application]
commandline.extend(args)
logger.info("Launching process: %r", commandline)
cap_mode = None
if capture_output:
cap_mode = subprocess.PIPE
process = subprocess.Popen(
commandline,
stdin=subprocess.PIPE,
stdout=cap_mode,
stderr=cap_mode,
close_fds=True,
preexec_fn=os.setsid,
**kwargs
)
return process
def get_autopilot_proxy_object_for_process(
process,
emulator_base,
dbus_bus='session'
):
"""Return the autopilot proxy object for the given *process*.
:raises: **RuntimeError** if no autopilot interface was found.
"""
pid = process.pid
proxy_obj = get_proxy_object_for_existing_process(
pid,
process=process,
emulator_base=emulator_base,
dbus_bus=dbus_bus,
)
proxy_obj.set_process(process)
return proxy_obj
[docs]def get_proxy_object_for_existing_process(
pid=None, dbus_bus='session', connection_name=None, process=None,
object_path=AUTOPILOT_PATH, application_name=None, emulator_base=None):
"""Return a single proxy object for an application that is already running
(i.e. launched outside of Autopilot).
Searches on the given bus (supplied by **dbus_bus**) for an application
matching the search criteria, creating the proxy object using the supplied
custom emulator **emulator_base** (defaults to None).
For example for an application on the system bus where the applications
PID is known::
app_proxy = get_proxy_object_for_existing_process(pid=app_pid)
Multiple criteria are allowed, for instance you could search on **pid**
and **connection_name**::
app_proxy = get_proxy_object_for_existing_process(
pid=app_pid, connection_name='org.gnome.gedit')
If the application from the previous example was on the system bus::
app_proxy = get_proxy_object_for_existing_process(
dbus_bus='system', pid=app_pid, connection_name='org.gnome.gedit')
It is possible to search for the application given just the applications
name.
An example for an application running on a custom bus searching using the
applications name::
app_proxy = get_proxy_object_for_existing_process(
application_name='qmlscene',
dbus_bus='unix:abstract=/tmp/dbus-IgothuMHNk')
:param pid: The PID of the application to search for.
:param dbus_bus: A string containing either 'session', 'system' or the
custom buses name (i.e. 'unix:abstract=/tmp/dbus-IgothuMHNk').
:param connection_name: A string containing the DBus connection name to
use with the search criteria.
:param object_path: A string containing the object path to use as the
search criteria. Defaults to
:py:data:`autopilot.introspection.constants.AUTOPILOT_PATH`.
:param application_name: A string containing the applications name to
search for.
:param emulator_base: The custom emulator to create the resulting proxy
object with.
:raises ProcessSearchError: if no search criteria match.
:raises RuntimeError: if the search criteria results in many matches.
:raises RuntimeError: if both ``process`` and ``pid`` are supplied, but
``process.pid != pid``.
"""
if process is not None:
if pid is None:
pid = process.pid
elif pid != process.pid:
raise RuntimeError("Supplied PID and process.pid do not match.")
if pid is not None and not _pid_is_running(pid):
raise ProcessSearchError("PID %d could not be found" % pid)
dbus_addresses = _get_dbus_addresses_from_search_parameters(
pid,
dbus_bus,
connection_name,
object_path,
process
)
if application_name:
app_name_check_fn = lambda i: get_classname_from_path(
i.introspection_iface.GetState('/')[0][0]) == application_name
dbus_addresses = filter(app_name_check_fn, dbus_addresses)
if dbus_addresses is None or len(dbus_addresses) == 0:
raise ProcessSearchError("Search criteria returned no results")
if len(dbus_addresses) > 1:
raise RuntimeError("Search criteria returned multiple results")
return _make_proxy_object(dbus_addresses[0], emulator_base)
def _get_dbus_addresses_from_search_parameters(
pid, dbus_bus, connection_name, object_path, process):
"""Returns a list of :py:class: `DBusAddress` for all successfully matched
criteria.
"""
_reset_known_connection_list()
for i in range(10):
_get_child_pids.reset_cache()
if process is not None and not _process_is_running(process):
return_code = process.poll()
raise ProcessSearchError(
"Process exited with exit code: %d"
% return_code
)
bus = _get_dbus_bus_from_string(dbus_bus)
valid_connections = _search_for_valid_connections(
pid,
bus,
connection_name,
object_path
)
if len(valid_connections) >= 1:
return [_get_dbus_address_object(name, object_path, bus) for name
in valid_connections]
sleep(1)
return []
def _reset_known_connection_list():
global connection_list
del connection_list[:]
def _search_for_valid_connections(pid, bus, connection_name, object_path):
global connection_list
def _get_unchecked_connections(all_connections):
return list(set(all_connections).difference(set(connection_list)))
possible_connections = _get_possible_connections(bus, connection_name)
connection_list = _get_unchecked_connections(possible_connections)
valid_connections = _get_valid_connections(
connection_list,
bus,
pid,
object_path
)
return valid_connections
def _process_is_running(process):
return process.poll() is None
def _get_valid_connections(connections, bus, pid, object_path):
filter_fn = partial(_match_connection, bus, pid, object_path)
valid_connections = filter(filter_fn, connections)
unique_connections = _dedupe_connections_on_pid(valid_connections, bus)
return unique_connections
def _dedupe_connections_on_pid(valid_connections, bus):
seen_pids = []
deduped_connections = []
for connection in valid_connections:
pid = _get_bus_connections_pid(bus, connection)
if pid not in seen_pids:
seen_pids.append(pid)
deduped_connections.append(connection)
return deduped_connections
def _get_dbus_address_object(connection_name, object_path, bus):
return DBusAddress(bus, connection_name, object_path)
def _get_dbus_bus_from_string(dbus_string):
if dbus_string == 'session':
return get_session_bus()
elif dbus_string == 'system':
return get_system_bus()
else:
return get_custom_bus(dbus_string)
def _get_possible_connections(bus, connection_name):
all_connection_names = bus.list_names()
if connection_name is None:
return all_connection_names
else:
matching_connections = [
c for c in all_connection_names if c == connection_name]
return matching_connections
def _match_connection(bus, pid, path, connection_name):
"""Does the connection match our search criteria?"""
success = True
if pid is not None:
success = _connection_matches_pid(bus, connection_name, pid)
if success:
success = _connection_has_path(bus, connection_name, path)
return success
def _connection_matches_pid(bus, connection_name, pid):
"""Given a PID checks wherever it or its children are connected on this
bus.
"""
if connection_name == 'org.freedesktop.DBus':
return False
try:
if _bus_pid_is_our_pid(bus, connection_name, pid):
return False
bus_pid = _get_bus_connections_pid(bus, connection_name)
except DBusException as e:
logger.info(
"dbus.DBusException while attempting to get PID for %s: %r" %
(connection_name, e))
return False
eligible_pids = [pid] + _get_child_pids(pid)
return bus_pid in eligible_pids
def _bus_pid_is_our_pid(bus, connection_name, pid):
"""Returns True if this scripts pid is the bus connections pid supplied."""
bus_pid = _get_bus_connections_pid(bus, connection_name)
return bus_pid == os.getpid()
def _connection_has_path(bus, connection_name, path):
"""Ensure the connection has the path that we expect to be there."""
try:
_check_connection_has_ap_interface(bus, connection_name, path)
return True
except DBusException:
return False
def _check_connection_has_ap_interface(bus, connection_name, path):
"""Simple check if a bus with connection + path provide the Autopilot
Introspection Interface.
:raises: **DBusException** if it does not.
"""
obj = bus.get_object(connection_name, path)
obj_iface = Interface(obj, 'com.canonical.Autopilot.Introspection')
obj_iface.GetVersion()
class _cached_get_child_pids(object):
"""Get a list of all child process Ids, for the given parent.
Since we call this often, and it's a very expensive call, we optimise this
such that the return value will be cached for each scan through the dbus
bus.
Calling reset_cache() at the end of each dbus scan will ensure that you get
fresh values on the next call.
"""
def __init__(self):
self._cached_result = None
def __call__(self, pid):
if self._cached_result is None:
self._cached_result = [
p.pid for p in psutil.Process(pid).get_children(recursive=True)
]
return self._cached_result
def reset_cache(self):
self._cached_result = None
_get_child_pids = _cached_get_child_pids()
def _make_proxy_object(data_source, emulator_base):
"""Returns a root proxy object given a DBus service name."""
proxy_bases = _get_proxy_object_base_classes(data_source)
if emulator_base is None:
emulator_base = type('DefaultEmulatorBase', (CustomEmulatorBase,), {})
proxy_bases = proxy_bases + (emulator_base, )
cls_name, cls_state = _get_proxy_object_class_name_and_state(data_source)
# Merge the object hierarchy.
clsobj = type(str("%sBase" % cls_name), proxy_bases, {})
proxy_class = type(str(cls_name), (clsobj,), {})
try:
dbus_tuple = data_source.introspection_iface.GetState("/")[0]
path, state = dbus_tuple
return proxy_class(state, path, data_source)
except IndexError:
raise RuntimeError("Unable to find root object of %r" % proxy_class)
def _get_proxy_object_base_classes(backend):
"""Return tuple of the base classes to use when creating a proxy object
for the given service name & path.
:raises: **RuntimeError** if the autopilot interface cannot be found.
"""
bases = [ApplicationProxyObject]
intro_xml = backend.dbus_introspection_iface.Introspect()
if AP_INTROSPECTION_IFACE not in intro_xml:
raise RuntimeError(
"Could not find Autopilot interface on DBus backend '%s'" %
backend)
if QT_AUTOPILOT_IFACE in intro_xml:
from autopilot.introspection.qt import QtObjectProxyMixin
bases.append(QtObjectProxyMixin)
return tuple(bases)
def _get_proxy_object_class_name_and_state(backend):
"""Return the class name and root state dictionary."""
object_path, object_state = backend.introspection_iface.GetState("/")[0]
return get_classname_from_path(object_path), object_state
class ApplicationProxyObject(DBusIntrospectionObject):
"""A class that better supports query data from an application."""
def __init__(self, state, path, backend):
super(ApplicationProxyObject, self).__init__(state, path, backend)
self._process = None
def set_process(self, process):
"""Set the subprocess.Popen object of the process that this is a proxy
for.
You should never normally need to call this method.
"""
self._process = process
@property
def pid(self):
return self._process.pid
@property
def process(self):
return self._process
def kill_application(self):
"""Kill the running process that this is a proxy for using
'kill `pid`'."""
subprocess.call(["kill", "%d" % self._process.pid])