From b630643c1dfc293faacac1491326f11ada9fd1e1 Mon Sep 17 00:00:00 2001 From: Thierry Reding Date: Mon, 18 May 2015 13:33:24 +0200 Subject: [PATCH 1/5] WIP --- .gitignore | 1 + boards/__init__.py | 63 ++++++++++++++++ boards/nvidia/__init__.py | 3 + boards/nvidia/jetson-tk1.py | 14 ++++ boards/nvidia/p2371.py | 5 ++ tegra.py => tegra-tests.py | 145 ++++++++++++------------------------ tegra/__init__.py | 51 +++++++++++++ tegra/tegra114.py | 8 ++ tegra/tegra124.py | 8 ++ tegra/tegra132.py | 8 ++ tegra/tegra20.py | 8 ++ tegra/tegra210.py | 8 ++ tegra/tegra30.py | 8 ++ tests/__init__.py | 0 tests/cpufreq.py | 49 ++++++++++++ tests/emc.py | 66 ++++++++++++++++ tests/system.py | 18 +++++ 17 files changed, 367 insertions(+), 96 deletions(-) create mode 100644 .gitignore create mode 100644 boards/__init__.py create mode 100644 boards/nvidia/__init__.py create mode 100644 boards/nvidia/jetson-tk1.py create mode 100644 boards/nvidia/p2371.py rename tegra.py => tegra-tests.py (52%) create mode 100644 tegra/__init__.py create mode 100644 tegra/tegra114.py create mode 100644 tegra/tegra124.py create mode 100644 tegra/tegra132.py create mode 100644 tegra/tegra20.py create mode 100644 tegra/tegra210.py create mode 100644 tegra/tegra30.py create mode 100644 tests/__init__.py create mode 100644 tests/cpufreq.py create mode 100644 tests/emc.py create mode 100644 tests/system.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/boards/__init__.py b/boards/__init__.py new file mode 100644 index 0000000..e393f49 --- /dev/null +++ b/boards/__init__.py @@ -0,0 +1,63 @@ +import fnmatch +import importlib +import os.path + +class Vendor: + def __init__(self, name): + self.name = name + self.boards = [] + + def load(self, path): + self.boards[:] + + vendor = os.path.basename(path) + + for path in os.listdir(path): + if fnmatch.fnmatch(path, '*.py'): + + if path == '__init__.py': + continue + + board = os.path.splitext(path)[0] + + name = 'boards.%s.%s' % (vendor, board) + module = importlib.import_module(name) + self.boards.append(module.Board) + + return self.boards + +class UnsupportedBoardException(Exception): + pass + +class Board: + pass + +boards = [] + +''' +Detect the type of board by looking at the compatible string of the device +tree's root node. +''' +def detect(): + with open('/sys/firmware/devicetree/base/compatible', 'r') as file: + line = file.read() + if line: + compatible = line.split('\0')[0] + + for board in boards: + if compatible == board.__compatible__: + return board() + + raise UnsupportedBoardException('Board: %s' % compatible) + + raise IOError + +path = os.path.dirname(__file__) + +for directory in os.listdir(path): + path = os.path.join(path, directory) + + if os.path.exists(os.path.join(path, '__init__.py')): + name = 'boards.%s' % directory + module = importlib.import_module(name) + boards.extend(module.vendor.load(path)) diff --git a/boards/nvidia/__init__.py b/boards/nvidia/__init__.py new file mode 100644 index 0000000..ce299f9 --- /dev/null +++ b/boards/nvidia/__init__.py @@ -0,0 +1,3 @@ +from boards import Vendor + +vendor = Vendor('NVIDIA') diff --git a/boards/nvidia/jetson-tk1.py b/boards/nvidia/jetson-tk1.py new file mode 100644 index 0000000..efd86f1 --- /dev/null +++ b/boards/nvidia/jetson-tk1.py @@ -0,0 +1,14 @@ +import boards +import tegra + +class Board(boards.Board): + __compatible__ = 'nvidia,jetson-tk1' + name = 'NVIDIA Jetson TK1' + + def check_devices_mmc(self): + check_device('/dev/mmcblk0') # eMMC + check_device('/dev/mmcblk1') # MMC/SD + + def check_devices(self): + super().check_devices() + self.check_devices_mmc() diff --git a/boards/nvidia/p2371.py b/boards/nvidia/p2371.py new file mode 100644 index 0000000..f3b1b20 --- /dev/null +++ b/boards/nvidia/p2371.py @@ -0,0 +1,5 @@ +import boards + +class Board(boards.Board): + __compatible__ = 'nvidia,p2371-e01' + name = 'NVIDIA P2371 reference board (E.1)' diff --git a/tegra.py b/tegra-tests.py similarity index 52% rename from tegra.py rename to tegra-tests.py index ec27e2a..a403107 100644 --- a/tegra.py +++ b/tegra-tests.py @@ -1,82 +1,17 @@ #!/usr/bin/python3 -import io +import boards import os -import pyudev -import random import sys -import time - -from linux import drm -from linux import kmsg -from linux import system -from linux import watchdog - -def check_device(path, indent = 2): - print('%s- %s: ' % (' ' * indent, path), end = '') - - if os.access(path, os.F_OK): - print('OK') - else: - print('failed') - -class Tegra(): - def check_devices(self): - print('- checking for devices:') - - def check(self): - print('%s detected' % self.__description__) - print('running tests:') - self.check_devices() - - def print(self, file = sys.stdout): - print('Board:', self.__description__, file = file) - -class Tegra20(Tegra): - __compatible__ = 'nvidia,tegra20' - -class Tegra30(Tegra): - __compatible__ = 'nvidia,tegra30' - -class Tegra114(Tegra): - __compatible__ = 'nvidia,tegra114' - -class Tegra124(Tegra): - __compatible__ = 'nvidia,tegra124' - -class JetsonTK1(Tegra124): - __compatible__ = 'nvidia,jetson-tk1' - __description__ = 'NVIDIA Jetson TK1' - - def check_devices_mmc(self): - check_device('/dev/mmcblk0') # eMMC - check_device('/dev/mmcblk1') # MMC/SD - - def check_devices(self): - super().check_devices() - self.check_devices_mmc() - -class Tegra132(Tegra): - __compatible__ = 'nvidia,tegra132' - -socs = [ - Tegra20, - Tegra30, - Tegra114, - Tegra124, - Tegra132, - ] - -boards = [ - JetsonTK1, - ] +import tegra +import unittest def test_show_info(): print('System information:') print('-------------------') - (system, hostname, release, version, machine) = os.uname() - print('OS:', system, release, version) + (opsys, hostname, release, version, machine) = os.uname() + print('OS:', opsys, release, version) print('Hostname:', hostname) print('Machine:', machine) @@ -153,7 +88,7 @@ class UnsupportedBoard(Exception): tree's root node. ''' def detect(): - with io.open('/sys/firmware/devicetree/base/compatible', 'r') as file: + with open('/sys/firmware/devicetree/base/compatible', 'r') as file: line = file.read() if line: # remove the last, empty element @@ -174,33 +109,44 @@ def test_board(): board = detect() board.check() +#def test_drm(): +# context = pyudev.Context() +# devices = context.list_devices(subsystem = 'drm') +# +# print('DRM devices:') +# +# for device in devices: +# if not device.device_node: +# continue +# +# if 'seat' not in device.tags: +# continue +# +# devno = device.device_number +# +# print(' %s (%u, %u)' % (device.device_node, os.major(devno), +# os.minor(devno))) +# +# dev = drm.open(device.device_node) +# version = dev.GetVersion() +# print(' %s (%u.%u.%u, %s, %s)' % (version.name, version.major, +# version.minor, version.patch, +# version.date, +# version.description)) + +class TegraTestLoader(unittest.TestLoader): + pass -def test_drm(): - context = pyudev.Context() - devices = context.list_devices(subsystem = 'drm') - - print('DRM devices:') - - for device in devices: - if not device.device_node: - continue - - if 'seat' not in device.tags: - continue - - devno = device.device_number +class TegraTestRunner(unittest.TextTestRunner): + pass - print(' %s (%u, %u)' % (device.device_node, os.major(devno), - os.minor(devno))) +if __name__ == '__main__': + board = boards.detect() + soc = tegra.detect() - dev = drm.open(device.device_node) - version = dev.GetVersion() - print(' %s (%u.%u.%u, %s, %s)' % (version.name, version.major, - version.minor, version.patch, - version.date, - version.description)) + print('Detected:', board.name, '(%s)' % soc.name) + print() -if __name__ == '__main__': if len(sys.argv) > 1: if sys.argv[1] == 'info': test_show_info() @@ -220,8 +166,15 @@ def test_drm(): if sys.argv[1] == 'board': test_board() - if sys.argv[1] == 'drm': - test_drm() +# if sys.argv[1] == 'drm': +# test_drm() + + if sys.argv[1] == 'all': + loader = TegraTestLoader() + runner = TegraTestRunner() + + suite = loader.discover('tests', '*.py', '.') + runner.run(suite) else: test_show_info() diff --git a/tegra/__init__.py b/tegra/__init__.py new file mode 100644 index 0000000..ab4dbb6 --- /dev/null +++ b/tegra/__init__.py @@ -0,0 +1,51 @@ +import importlib +import os.path +import sys + +class UnsupportedSoCException(Exception): + pass + +class SoC: + compatible = 'nvidia,tegra' + name = 'NVIDIA Tegra' + +''' +Detect the type of SoC by looking at the compatible string of the device +tree's root node. +''' +def detect(): + with open('/sys/firmware/devicetree/base/compatible', 'r') as file: + line = file.read() + if line: + # remove the last, empty element + values = line.split('\0')[:-1] + compatible = values[-1] + + for soc in socs: + if compatible == soc.compatible: + return soc() + + raise UnsupportedSoCException('SoC: %s' % (compatible)) + + raise IOError + +# +# initialization +# +socs = [] + +path = os.path.dirname(__file__) +sys.path.append(path) + +paths = sorted(os.listdir(path)) +for path in paths: + if not path.endswith('.py'): + continue + + if path == '__init__.py': + continue + + name = os.path.splitext(path)[0] + module = importlib.import_module(name) + + socs.append(module.SoC) diff --git a/tegra/tegra114.py b/tegra/tegra114.py new file mode 100644 index 0000000..93c454a --- /dev/null +++ b/tegra/tegra114.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra114' + name = 'NVIDIA Tegra114' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra124.py b/tegra/tegra124.py new file mode 100644 index 0000000..2bfe3b7 --- /dev/null +++ b/tegra/tegra124.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra124' + name = 'NVIDIA Tegra124' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra132.py b/tegra/tegra132.py new file mode 100644 index 0000000..a273b8d --- /dev/null +++ b/tegra/tegra132.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra132' + name = 'NVIDIA Tegra132' + + def __init__(self): + self.num_cpus = 2 diff --git a/tegra/tegra20.py b/tegra/tegra20.py new file mode 100644 index 0000000..120a00e --- /dev/null +++ b/tegra/tegra20.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra20' + name = 'NVIDIA Tegra20' + + def __init__(self): + self.num_cpus = 2 diff --git a/tegra/tegra210.py b/tegra/tegra210.py new file mode 100644 index 0000000..c5fe191 --- /dev/null +++ b/tegra/tegra210.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra210' + name = 'NVIDIA Tegra210' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra30.py b/tegra/tegra30.py new file mode 100644 index 0000000..43e79d0 --- /dev/null +++ b/tegra/tegra30.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra30' + name = 'NVIDIA Tegra30' + + def __init__(self): + self.num_cpus = 4 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cpufreq.py b/tests/cpufreq.py new file mode 100644 index 0000000..d8c42a0 --- /dev/null +++ b/tests/cpufreq.py @@ -0,0 +1,49 @@ +#!/usr/bin/python3 + +import os.path +import unittest + +def cpufreq_supported(): + return os.path.exists('/sys/devices/system/cpu/cpufreq') + +@unittest.skipUnless(cpufreq_supported(), 'CPUfreq not supported') +class CPUfreq(unittest.TestCase): + def test_list_governors(self): + sysfs_cpufreq = '/sys/devices/system/cpu/cpu%u/cpufreq' % 0 + supported_governors = [] + + with open('%s/scaling_available_governors' % sysfs_cpufreq, 'r') as f: + for line in f: + supported_governors.extend(line.split()) + + with open('%s/scaling_governor' % sysfs_cpufreq) as f: + current_governor = f.read().strip() + + print('- supported governors:') + + for governor in supported_governors: + if governor == current_governor: + print(' - %s *' % governor) + else: + print(' - %s' % governor) + + def test_list_rates(self): + sysfs_cpufreq = '/sys/devices/system/cpu/cpu%u/cpufreq' % 0 + supported_rates = [] + + with open('%s/scaling_available_frequencies' % sysfs_cpufreq, 'r') as f: + for line in f: + supported_rates.extend(line.split()) + + width = max([len(x) for x in supported_rates]) + + with open('%s/scaling_cur_freq' % sysfs_cpufreq, 'r') as f: + current = f.read().strip() + + print('- supported rates:') + + for rate in supported_rates: + if rate == current: + print(' - %*s' % (width, rate), '*') + else: + print(' - %*s' % (width, rate)) diff --git a/tests/emc.py b/tests/emc.py new file mode 100644 index 0000000..535e455 --- /dev/null +++ b/tests/emc.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +import os.path +import sys +import unittest + +def emc_supported(): + return os.path.exists('/sys/kernel/debug/emc') + +def emc_set_rate(rate): + with open('/sys/kernel/debug/emc/rate', 'w') as f: + f.write(rate) + + with open('/sys/kernel/debug/emc/rate', 'r') as f: + rate = f.read().strip(); + + return rate + +@unittest.skipUnless(emc_supported(), 'EMC frequency scaling not supported') +class EMC(unittest.TestCase): + def test_set_rates(self): + supported_rates = [] + + print('EMC frequency scaling:') + print('======================') + + with open('/sys/kernel/debug/emc/supported_rates') as f: + for line in f: + supported_rates.extend(line.split()) + + width = max([len(x) for x in supported_rates]) + + with open('/sys/kernel/debug/emc/rate') as f: + current = f.read().strip() + + print('- supported rates: (* = current)'); + + for rate in supported_rates: + if rate == current: + print(' - %*s' % (width, rate), '*') + else: + print(' - %*s' % (width, rate)) + + print('- testing:') + + for rate in supported_rates: + print(' - %*s...' % (width, rate), end = '') + sys.stdout.flush() + + actual = emc_set_rate(rate) + if actual != rate: + print('failed (reported %s)' % actual) + else: + print('done') + + print('- resetting to old rate: %s...' % current, end = '') + sys.stdout.flush() + + actual = emc_set_rate(current) + if actual != current: + print('failed (reported %s)' % actual) + else: + print('done') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/system.py b/tests/system.py new file mode 100644 index 0000000..50feb2e --- /dev/null +++ b/tests/system.py @@ -0,0 +1,18 @@ +import os +import unittest + +class SystemInfo(unittest.TestCase): + def test_show(self): + print('System information:') + print('-------------------') + + (opsys, hostname, release, version, machine) = os.uname() + print('OS:', opsys, release, version) + print('Hostname:', hostname) + print('Machine:', machine) + + board = detect() + board.print() + + cpus = system.CPUSet() + cpus.print() From eecb3ad79397fbea43c011d3b68791a765c6aaff Mon Sep 17 00:00:00 2001 From: Thierry Reding Date: Wed, 27 May 2015 14:55:55 +0200 Subject: [PATCH 2/5] WIP --- linux/debugfs.py | 9 ++ linux/drm.py | 2 +- linux/log.py | 19 ++++ linux/sysfs.py | 16 ++++ linux/system.py | 72 ++++++++++++-- tegra-tests.py | 244 +++++++++++++++++++++-------------------------- tests/cpufreq.py | 135 ++++++++++++++++++++------ tests/drm.py | 36 +++++++ tests/emc.py | 36 +++---- tests/system.py | 56 ++++++++--- 10 files changed, 420 insertions(+), 205 deletions(-) create mode 100644 linux/debugfs.py create mode 100644 linux/log.py create mode 100644 linux/sysfs.py create mode 100644 tests/drm.py diff --git a/linux/debugfs.py b/linux/debugfs.py new file mode 100644 index 0000000..c748a11 --- /dev/null +++ b/linux/debugfs.py @@ -0,0 +1,9 @@ +import io, os.path + +mountpoint = '/sys/kernel/debug' + +def exists(path): + return os.path.exists('%s/%s' % (mountpoint, path)) + +def open(path, *args, **kwargs): + return io.open('%s/%s' % (mountpoint, path), *args, **kwargs) diff --git a/linux/drm.py b/linux/drm.py index 9b4b8de..d0ea0e3 100644 --- a/linux/drm.py +++ b/linux/drm.py @@ -38,7 +38,7 @@ def __str__(self): class libdrm(ctypes.CDLL): def __init__(self): - ctypes.CDLL.__init__(self, 'libdrm.so') + ctypes.CDLL.__init__(self, 'libdrm.so.2') self.drmGetVersion.argstype = [ ctypes.c_int ] self.drmGetVersion.restype = drmVersionPtr diff --git a/linux/log.py b/linux/log.py new file mode 100644 index 0000000..d6284e0 --- /dev/null +++ b/linux/log.py @@ -0,0 +1,19 @@ +import sys + +color = True + +def begin(message, end = ''): + print('%s...' % message, end = end) + sys.stdout.flush() + +def end(error = None): + if error: + if color: + print('\033[31mfailed\033[0m:', error) + else: + print('failed:', error) + else: + if color: + print('\033[32mdone\033[0m') + else: + print('done') diff --git a/linux/sysfs.py b/linux/sysfs.py new file mode 100644 index 0000000..311cdce --- /dev/null +++ b/linux/sysfs.py @@ -0,0 +1,16 @@ +import io, os.path + +mountpoint = '/sys' + +def exists(path): + return os.path.exists('%s/%s' % (mountpoint, path)) + +def open(path, *args, **kwargs): + return io.open('%s/%s' % (mountpoint, path), *args, **kwargs) + +class Object: + def __init__(self, path): + self.path = path + + def open(self, path, *args, **kwargs): + return open('%s/%s' % (self.path, path), *args, **kwargs) diff --git a/linux/system.py b/linux/system.py index 3ecc510..edc6f60 100644 --- a/linux/system.py +++ b/linux/system.py @@ -1,20 +1,23 @@ #!/usr/bin/python -import io +import ctypes import os import sys +import time + +from . import ioctl, libc, sysfs ''' Represents one CPU present in the system. ''' class CPU(): def __init__(self, num): - self.path = '/sys/devices/system/cpu/cpu%u' % num + self.sysfs = sysfs.Object('devices/system/cpu/cpu%u' % num) self.hotpluggable = True self.num = num - with io.open(os.path.join(self.path, 'online'), 'r') as cpu: - online = cpu.readline().strip() + with self.sysfs.open('online', 'r') as file: + online = file.readline().strip() if online == '0': self.online = False else: @@ -24,13 +27,13 @@ def __init__(self, num): Bring the CPU online or take it offline. ''' def set_online(self, online): - with io.open(os.path.join(self.path, 'online'), 'w') as cpu: + with self.sysfs.open('online', 'w') as file: if online: - cpu.write('1') + file.write('1') else: - cpu.write('0') + file.write('0') - cpu.online = online + self.online = online ''' Return the CPU mask. @@ -52,7 +55,7 @@ def __str__(self): ''' class CPUSet(): def __init__(self): - with io.open('/sys/devices/system/cpu/present', 'r') as file: + with sysfs.open('devices/system/cpu/present', 'r') as file: present = file.readline().rstrip() self.start, self.end = map(int, present.split('-')) @@ -144,6 +147,22 @@ def apply_mask(self, mask): def __iter__(self): return iter(self.cpus) +''' +Provides access to a realtime clock device in the system. +''' +class RTC: + def __init__(self, name = 'rtc0'): + self.sysfs = sysfs.Object('class/rtc/%s' % name) + + ''' + Set the RTC to raise an alarm a given number of seconds from now. + ''' + def set_alarm_relative(self, alarm): + alarm = int(time.time()) + alarm + + with self.sysfs.open('wakealarm', 'w') as file: + file.write('%u' % alarm) + ''' Provides access to the system and system wide controls. ''' @@ -152,5 +171,38 @@ def __init__(self): pass def suspend(self): - with io.open('/sys/power/state', 'w') as file: + with sysfs.open('power/state', 'w') as file: file.write('mem') + +''' +Provides access to a watchdog device in the system. +''' +class Watchdog(): + WDIOC_SETOPTIONS = ioctl.IOR(ord('W'), 4, 4) + WDIOC_SETTIMEOUT = ioctl.IOWR(ord('W'), 6, 4) + + WDIOS_DISABLECARD = 0x0001 + WDIOS_ENABLECARD = 0x0002 + + def __init__(self, path): + self.fd = os.open(path, os.O_RDWR) + + def disable(self): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + + def enable(self): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_ENABLECARD)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + + def set_timeout(self, timeout): + timeout = ctypes.pointer(ctypes.c_uint(timeout)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) + + def __del__(self): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + os.close(self.fd) diff --git a/tegra-tests.py b/tegra-tests.py index a403107..e7d3d8c 100644 --- a/tegra-tests.py +++ b/tegra-tests.py @@ -1,180 +1,154 @@ #!/usr/bin/python3 +import argparse import boards +import io import os import sys import tegra import unittest -def test_show_info(): - print('System information:') - print('-------------------') +from linux import kmsg, log, system - (opsys, hostname, release, version, machine) = os.uname() - print('OS:', opsys, release, version) - print('Hostname:', hostname) - print('Machine:', machine) +class TegraTestLoader(unittest.TestLoader): + pass - board = detect() - board.print() +#class TegraTestRunner(unittest.TextTestRunner): +# pass - cpus = system.CPUSet() - cpus.print() +class WritelnDecorator(object): + def __init__(self, stream): + self.stream = stream -def test_kmsg(): - with kmsg.open('/dev/kmsg') as log: - for entry in log: - print(entry) + def __getattr__(self, attr): + if attr in ('stream', '__getstate__'): + raise AttributeError(attr) -''' -Provides access to a realtime clock device in the system. -''' -class RTC: - def __init__(self, name = 'rtc0'): - self.alarm = '/sys/class/rtc/%s/wakealarm' % name + return getattr(self.stream, attr) - ''' - Set the RTC to raise an alarm a given number of seconds from now. - ''' - def set_alarm_relative(self, alarm): - alarm = int(time.time()) + alarm + def writeln(self, arg = None): + if arg: + self.write(arg) - with io.open(self.alarm, 'w') as rtc: - rtc.write('%u' % alarm) + self.write('\n') -def test_suspend(rtc = 'rtc0'): - print('Testing suspend/resume') +def strclass(cls): + return '%s.%s' % (cls.__module__, cls.__name__) - rtc = RTC(rtc) - rtc.set_alarm_relative(10) +class RedirectOutput(io.IOBase): + def __init__(self, stream = sys.stdout): + self.stream = stream + self.newline = True - sys = system.System() - sys.suspend() + def write(self, b): + if self.newline: + self.stream.write('| ') -def test_cpu_hotplug(): - print('Testing CPU hotplugging') + self.stream.write(b) - cpus = system.CPUSet() + self.newline = b.endswith('\n') - for cpu in cpus: - print('CPU#%u: mask:' % cpu.num, 1 << cpu.num) +class TegraTestResult(unittest.TestResult): + def __init__(self, stream = None, descriptions = True, verbosity = 1): + super().__init__(stream, descriptions, verbosity) + self.stream = WritelnDecorator(stream) + self.verbosity = verbosity - masks = cpus.generate_masks() + def startTest(self, test): + self.stdout = sys.stdout + self.stderr = sys.stderr - # go through all combinations once - for mask in masks: - cpus.apply_mask(mask) + if self.verbosity > 0: + log.begin('running: %s (%s)' % (strclass(test.__class__), + test._testMethodName), + end = '\n') + sys.stdout = RedirectOutput(sys.stdout) + sys.stderr = RedirectOutput(sys.stderr) + else: + log.begin('running: %s (%s)' % (strclass(test.__class__), + test._testMethodName)) + sys.stdout = None + sys.stderr = None - # select random combinations - for i in range(0, 100): - mask = random.choice(masks) - cpus.apply_mask(mask) + super().startTest(test) - # bring all CPUs online - cpus.online() + def stopTest(self, test): + super().stopTest(test) + sys.stderr = self.stderr + sys.stdout = self.stdout + log.end() -def test_watchdog(): - print('Testing watchdog') +class TegraTestRunner(object): + def __init__(self, stream = None, descriptions = True, verbosity = 1): + if not stream: + stream = sys.stderr - wdt = watchdog.Watchdog('/dev/watchdog') - wdt.set_timeout(30) - wdt.enable() + self.stream = stream + self.descriptions = descriptions + self.verbosity = verbosity -class UnsupportedBoard(Exception): - pass + def run(self, test): + result = TegraTestResult(self.stream, self.descriptions, self.verbosity) + test(result) + return result -''' -Detect the type of board by looking at the compatible string of the device -tree's root node. -''' -def detect(): - with open('/sys/firmware/devicetree/base/compatible', 'r') as file: - line = file.read() - if line: - # remove the last, empty element - values = line.split('\0')[:-1] - - name = values[0] - soc = values[-1] - - for board in boards: - if name == board.__compatible__: - return board() - - raise UnsupportedBoardException('SoC: %s, Board: %s' % (soc, name)) - - raise IOError - -def test_board(): - board = detect() - board.check() - -#def test_drm(): -# context = pyudev.Context() -# devices = context.list_devices(subsystem = 'drm') -# -# print('DRM devices:') -# -# for device in devices: -# if not device.device_node: -# continue -# -# if 'seat' not in device.tags: -# continue -# -# devno = device.device_number -# -# print(' %s (%u, %u)' % (device.device_node, os.major(devno), -# os.minor(devno))) -# -# dev = drm.open(device.device_node) -# version = dev.GetVersion() -# print(' %s (%u.%u.%u, %s, %s)' % (version.name, version.major, -# version.minor, version.patch, -# version.date, -# version.description)) - -class TegraTestLoader(unittest.TestLoader): - pass +def show_system_info(): + print('System information:') + print('-------------------') -class TegraTestRunner(unittest.TextTestRunner): - pass + (opsys, hostname, release, version, machine) = os.uname() + print('OS:', opsys, release, version) + print('Hostname:', hostname) + print('Machine:', machine) -if __name__ == '__main__': board = boards.detect() + print('Board:', board.name) + soc = tegra.detect() + print('SoC:', soc.name) - print('Detected:', board.name, '(%s)' % soc.name) - print() + cpus = system.CPUSet() + cpus.print() - if len(sys.argv) > 1: - if sys.argv[1] == 'info': - test_show_info() + print() - if sys.argv[1] == 'kmsg': - test_kmsg() +def show_kmsg(): + with kmsg.open('/dev/kmsg') as log: + for entry in log: + print(entry) - if sys.argv[1] == 'suspend': - test_suspend() +if __name__ == '__main__': + parser = argparse.ArgumentParser('') + parser.add_argument('--batch', '-b', action = 'store_const', const = True) + parser.add_argument('--verbose', '-v', default = 1, action = 'count') + parser.add_argument('--quiet', '-q', action = 'store_const', const = True) + parser.add_argument('tests', type = str, nargs = '*') + args = parser.parse_args(sys.argv[1:]) - if sys.argv[1] == 'cpu-hotplug': - test_cpu_hotplug() + if args.quiet: + args.verbose -= 1 - if sys.argv[1] == 'watchdog': - test_watchdog() + if args.verbose > 0: + show_system_info() + #show_kmsg() - if sys.argv[1] == 'board': - test_board() + tests = [] -# if sys.argv[1] == 'drm': -# test_drm() + for module in args.tests: + tests.append('tests.%s' % module) - if sys.argv[1] == 'all': - loader = TegraTestLoader() - runner = TegraTestRunner() + loader = TegraTestLoader() - suite = loader.discover('tests', '*.py', '.') - runner.run(suite) + if args.batch: + runner = unittest.TextTestRunner(verbosity = args.verbose) + log.color = False + else: + runner = TegraTestRunner(verbosity = args.verbose) + log.color = False + if not tests: + suite = loader.discover('tests', '*.py', '.') else: - test_show_info() + suite = loader.loadTestsFromNames(tests) + + runner.run(suite) diff --git a/tests/cpufreq.py b/tests/cpufreq.py index d8c42a0..6bd1bcf 100644 --- a/tests/cpufreq.py +++ b/tests/cpufreq.py @@ -1,49 +1,126 @@ #!/usr/bin/python3 -import os.path import unittest +from linux import log, sysfs, system + def cpufreq_supported(): - return os.path.exists('/sys/devices/system/cpu/cpufreq') + return sysfs.exists('devices/system/cpu/cpu0/cpufreq') + +class CPU: + def __init__(self, num): + self.sysfs = sysfs.Object('devices/system/cpu/cpu%u/cpufreq' % num) + self.supported_governors = [] + self.supported_rates = [] + + with self.sysfs.open('scaling_available_governors', 'r') as f: + for line in f: + self.supported_governors.extend(line.split()) + + with self.sysfs.open('scaling_available_frequencies', 'r') as f: + for line in f: + self.supported_rates.extend(line.split()) + + def __getattr__(self, name): + if name == 'governor': + with self.sysfs.open('scaling_governor') as file: + return file.read().strip() + + if name == 'rate': + with self.sysfs.open('scaling_cur_freq') as file: + return file.read().strip() + + return super.__getattr__(self, name) + + def __setattr__(self, name, value): + if name == 'governor': + with self.sysfs.open('scaling_governor', 'w') as file: + file.write(value) + + return + + if name == 'rate': + with self.sysfs.open('scaling_setspeed', 'w') as file: + file.write(value) + + with self.sysfs.open('scaling_cur_freq', 'r') as file: + rate = file.read().strip() + + if rate != value: + raise Exception + + return + + return super.__setattr__(self, name, value) @unittest.skipUnless(cpufreq_supported(), 'CPUfreq not supported') -class CPUfreq(unittest.TestCase): +class cpufreq(unittest.TestCase): def test_list_governors(self): - sysfs_cpufreq = '/sys/devices/system/cpu/cpu%u/cpufreq' % 0 - supported_governors = [] + cpuset = system.CPUSet() - with open('%s/scaling_available_governors' % sysfs_cpufreq, 'r') as f: - for line in f: - supported_governors.extend(line.split()) + for cpu in cpuset: + print('- CPU#%u:' % cpu.num) - with open('%s/scaling_governor' % sysfs_cpufreq) as f: - current_governor = f.read().strip() + cpu = CPU(cpu.num) - print('- supported governors:') + print(' - supported governors:') - for governor in supported_governors: - if governor == current_governor: - print(' - %s *' % governor) - else: - print(' - %s' % governor) + for governor in cpu.supported_governors: + if governor == cpu.governor: + print(' - %s *' % governor) + else: + print(' - %s' % governor) def test_list_rates(self): - sysfs_cpufreq = '/sys/devices/system/cpu/cpu%u/cpufreq' % 0 - supported_rates = [] + cpuset = system.CPUSet() - with open('%s/scaling_available_frequencies' % sysfs_cpufreq, 'r') as f: - for line in f: - supported_rates.extend(line.split()) + for cpu in cpuset: + print('- CPU#%u:' % cpu.num) + cpu = CPU(cpu.num) + + width = max([len(x) for x in cpu.supported_rates]) + + print(' - supported rates:') + + for rate in cpu.supported_rates: + if rate == cpu.rate: + print(' - %*s' % (width, rate), '*') + else: + print(' - %*s' % (width, rate)) + + def test_set_rates(self): + cpuset = system.CPUSet() + + for cpu in cpuset: + print('- CPU#%u' % cpu.num) + cpu = CPU(cpu.num) + + log.begin(' - switching to userspace governor') + governor = cpu.governor + + try: + cpu.governor = 'userspace' + except Exception as e: + log.end(e) + continue + else: + log.end() - width = max([len(x) for x in supported_rates]) + for rate in cpu.supported_rates: + log.begin(' - setting rate %s' % rate) - with open('%s/scaling_cur_freq' % sysfs_cpufreq, 'r') as f: - current = f.read().strip() + try: + cpu.rate = rate + except Exception as e: + log.end(e) + else: + log.end() - print('- supported rates:') + log.begin(' - restoring %s governor' % governor) - for rate in supported_rates: - if rate == current: - print(' - %*s' % (width, rate), '*') + try: + cpu.governor = governor + except Exception as e: + log.end(e) else: - print(' - %*s' % (width, rate)) + log.end() diff --git a/tests/drm.py b/tests/drm.py new file mode 100644 index 0000000..8d891c7 --- /dev/null +++ b/tests/drm.py @@ -0,0 +1,36 @@ +#!/usr/bin/python3 + +import os +import pyudev +import sys +import unittest + +import linux.drm + +class drm(unittest.TestCase): + def test_list_devices(self): + context = pyudev.Context() + devices = context.list_devices(subsystem = 'drm') + + print('DRM devices:') + + for device in devices: + if not device.device_node: + continue + + if 'seat' not in device.tags: + continue + + devno = device.device_number + + print(' %s (%u, %u)' % (device.device_node, os.major(devno), + os.minor(devno))) + + dev = linux.drm.open(device.device_node) + version = dev.GetVersion() + print(' Driver:', version.name) + print(' Description:', version.description) + print(' Version: %u.%u.%u (%s)' % (version.major, + version.minor, + version.patch, + version.date)) diff --git a/tests/emc.py b/tests/emc.py index 535e455..8a85b2b 100644 --- a/tests/emc.py +++ b/tests/emc.py @@ -1,36 +1,37 @@ #!/usr/bin/python3 -import os.path -import sys import unittest +from linux import debugfs, log + def emc_supported(): - return os.path.exists('/sys/kernel/debug/emc') + return debugfs.exists('emc') def emc_set_rate(rate): - with open('/sys/kernel/debug/emc/rate', 'w') as f: + with debugfs.open('emc/rate', 'w') as f: f.write(rate) - with open('/sys/kernel/debug/emc/rate', 'r') as f: + with debugfs.open('emc/rate', 'r') as f: rate = f.read().strip(); return rate @unittest.skipUnless(emc_supported(), 'EMC frequency scaling not supported') -class EMC(unittest.TestCase): +class emc(unittest.TestCase): def test_set_rates(self): supported_rates = [] + errors = 0 print('EMC frequency scaling:') print('======================') - with open('/sys/kernel/debug/emc/supported_rates') as f: + with debugfs.open('emc/supported_rates') as f: for line in f: supported_rates.extend(line.split()) width = max([len(x) for x in supported_rates]) - with open('/sys/kernel/debug/emc/rate') as f: + with debugfs.open('emc/rate') as f: current = f.read().strip() print('- supported rates: (* = current)'); @@ -44,23 +45,22 @@ def test_set_rates(self): print('- testing:') for rate in supported_rates: - print(' - %*s...' % (width, rate), end = '') - sys.stdout.flush() + log.begin(' - %*s...' % (width, rate)) actual = emc_set_rate(rate) if actual != rate: - print('failed (reported %s)' % actual) + log.end('reported %s' % actual) + errors += 1 else: - print('done') + log.end() - print('- resetting to old rate: %s...' % current, end = '') - sys.stdout.flush() + log.begin('- resetting to old rate: %s...' % current) actual = emc_set_rate(current) if actual != current: - print('failed (reported %s)' % actual) + log.end('reported %s' % actual) else: - print('done') + log.end() -if __name__ == '__main__': - unittest.main() + self.longMessage = False + self.assertEqual(errors, 0, 'not all rate changes succeeded') diff --git a/tests/system.py b/tests/system.py index 50feb2e..d34909c 100644 --- a/tests/system.py +++ b/tests/system.py @@ -1,18 +1,50 @@ -import os +import os.path +import random import unittest -class SystemInfo(unittest.TestCase): - def test_show(self): - print('System information:') - print('-------------------') +from linux import system, watchdog - (opsys, hostname, release, version, machine) = os.uname() - print('OS:', opsys, release, version) - print('Hostname:', hostname) - print('Machine:', machine) +class suspend(unittest.TestCase): + def test(self, rtc = 'rtc0'): + print('Testing suspend/resume') - board = detect() - board.print() + rtc = system.RTC(rtc) + rtc.set_alarm_relative(2) + + sys = system.System() + sys.suspend() + +class cpuhotplug(unittest.TestCase): + def test(self): + print('Testing CPU hotplugging') cpus = system.CPUSet() - cpus.print() + + for cpu in cpus: + print('CPU#%u: mask:' % cpu.num, 1 << cpu.num) + + masks = cpus.generate_masks() + + # go through all combinations once + for mask in masks: + cpus.apply_mask(mask) + + # select random combinations + for i in range(0, 100): + mask = random.choice(masks) + cpus.apply_mask(mask) + + # bring all CPUs online + cpus.online() + +def watchdog_supported(): + return os.path.exists('/dev/watchdog') + +@unittest.skipUnless(watchdog_supported(), 'Watchdog not supported') +class watchdog(unittest.TestCase): + def test(self): + print('Testing watchdog') + + wdt = system.Watchdog('/dev/watchdog') + wdt.set_timeout(30) + wdt.enable() From e285ee87da9ed1a03cb6043bbdfa102d724b0db9 Mon Sep 17 00:00:00 2001 From: Mikko Perttunen Date: Fri, 10 Jul 2015 13:55:00 +0300 Subject: [PATCH 3/5] Add submitvic test to DRM tests This test tests the VIC and Host1x submissions. --- tests/drm.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/drm.py b/tests/drm.py index 8d891c7..e3cdfd8 100644 --- a/tests/drm.py +++ b/tests/drm.py @@ -4,6 +4,7 @@ import pyudev import sys import unittest +import subprocess import linux.drm @@ -34,3 +35,13 @@ def test_list_devices(self): version.minor, version.patch, version.date)) + +submit_vic_path = '/root/drm/tests/tegra/submit_vic' +def submit_vic_exists(): + return os.path.exists(submit_vic_path) + +@unittest.skipUnless(submit_vic_exists(), 'submit_vic DRM test not present') +class submitvic(unittest.TestCase): + def test(self): + subprocess.check_call(submit_vic_path, stdout = subprocess.DEVNULL, + stderr = subprocess.DEVNULL) From 2b906419a67e9d7ec4f52677b887f1f5620aee08 Mon Sep 17 00:00:00 2001 From: Mikko Perttunen Date: Fri, 10 Jul 2015 13:56:25 +0300 Subject: [PATCH 4/5] Use __enter__ and __exit__ instead of __init__ and __del__ Execution of __del__ is not guaranteed due to the garbage collector. --- linux/drm.py | 29 +++++++++++++---------------- linux/system.py | 7 +++++-- linux/watchdog.py | 4 ++-- tests/drm.py | 16 ++++++++-------- tests/system.py | 6 +++--- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/linux/drm.py b/linux/drm.py index d0ea0e3..c9ce8e5 100644 --- a/linux/drm.py +++ b/linux/drm.py @@ -45,14 +45,19 @@ def __init__(self): class DRM: def __init__(self, device): + self.device = device + + def __enter__(self): self.libdrm = libdrm() - if type(device) == str: - self.fd = os.open(device, os.O_RDWR) - elif type(device) == int: - self.fd = device + if type(self.device) == str: + self.fd = os.open(self.device, os.O_RDWR) + elif type(self.device) == int: + self.fd = self.device + + return self - def __del__(self): + def __exit__(self, type, value, traceback): os.close(self.fd) def GetVersion(self): @@ -61,15 +66,7 @@ def GetVersion(self): self.libdrm.drmFreeVersion(version) return result -def open(device): - if type(device) == str: - fd = os.open(device, os.O_RDWR) - elif type(device) == int: - fd = device - - return DRM(fd) - if __name__ == '__main__': - drm = open(sys.argv[1]) - version = drm.GetVersion() - print(version) + with DRM(sys.argv[1]) as drm: + version = drm.GetVersion() + print(version) diff --git a/linux/system.py b/linux/system.py index edc6f60..642ba18 100644 --- a/linux/system.py +++ b/linux/system.py @@ -185,7 +185,10 @@ class Watchdog(): WDIOS_ENABLECARD = 0x0002 def __init__(self, path): - self.fd = os.open(path, os.O_RDWR) + self.path = path + + def __enter__(self): + self.fd = os.open(self.path, os.O_RDWR) def disable(self): options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) @@ -202,7 +205,7 @@ def set_timeout(self, timeout): libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) - def __del__(self): + def __exit__(self, type, value, traceback): options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) os.close(self.fd) diff --git a/linux/watchdog.py b/linux/watchdog.py index 9d12cbe..381e2bb 100644 --- a/linux/watchdog.py +++ b/linux/watchdog.py @@ -11,7 +11,7 @@ class Watchdog(): WDIOS_DISABLECARD = 0x0001 WDIOS_ENABLECARD = 0x0002 - def __init__(self, path): + def __enter__(self, path): self.fd = os.open(path, os.O_RDWR) def disable(self): @@ -29,7 +29,7 @@ def set_timeout(self, timeout): libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) - def __del__(self): + def __exit__(self): options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) os.close(self.fd) diff --git a/tests/drm.py b/tests/drm.py index e3cdfd8..3dcd26b 100644 --- a/tests/drm.py +++ b/tests/drm.py @@ -27,14 +27,14 @@ def test_list_devices(self): print(' %s (%u, %u)' % (device.device_node, os.major(devno), os.minor(devno))) - dev = linux.drm.open(device.device_node) - version = dev.GetVersion() - print(' Driver:', version.name) - print(' Description:', version.description) - print(' Version: %u.%u.%u (%s)' % (version.major, - version.minor, - version.patch, - version.date)) + with linux.drm.DRM(device.device_node) as dev: + version = dev.GetVersion() + print(' Driver:', version.name) + print(' Description:', version.description) + print(' Version: %u.%u.%u (%s)' % (version.major, + version.minor, + version.patch, + version.date)) submit_vic_path = '/root/drm/tests/tegra/submit_vic' def submit_vic_exists(): diff --git a/tests/system.py b/tests/system.py index d34909c..94672e1 100644 --- a/tests/system.py +++ b/tests/system.py @@ -45,6 +45,6 @@ class watchdog(unittest.TestCase): def test(self): print('Testing watchdog') - wdt = system.Watchdog('/dev/watchdog') - wdt.set_timeout(30) - wdt.enable() + with system.Watchdog('/dev/watchdog') as wdt: + wdt.set_timeout(30) + wdt.enable() From d85c885bb2e82455d54f208f5bf3bc81e9c1236d Mon Sep 17 00:00:00 2001 From: Mikko Perttunen Date: Fri, 10 Jul 2015 13:57:25 +0300 Subject: [PATCH 5/5] Remove unused watchdog.py Remove the unused linux.watchdog module; it is unused and a duplicate already exists in linux.system. --- linux/watchdog.py | 35 ----------------------------------- tests/system.py | 2 +- 2 files changed, 1 insertion(+), 36 deletions(-) delete mode 100644 linux/watchdog.py diff --git a/linux/watchdog.py b/linux/watchdog.py deleted file mode 100644 index 381e2bb..0000000 --- a/linux/watchdog.py +++ /dev/null @@ -1,35 +0,0 @@ -import ctypes -import os - -from . import ioctl -from . import libc - -class Watchdog(): - WDIOC_SETOPTIONS = ioctl.IOR(ord('W'), 4, 4) - WDIOC_SETTIMEOUT = ioctl.IOWR(ord('W'), 6, 4) - - WDIOS_DISABLECARD = 0x0001 - WDIOS_ENABLECARD = 0x0002 - - def __enter__(self, path): - self.fd = os.open(path, os.O_RDWR) - - def disable(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - - def enable(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_ENABLECARD)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - - def set_timeout(self, timeout): - timeout = ctypes.pointer(ctypes.c_uint(timeout)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) - - def __exit__(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - os.close(self.fd) diff --git a/tests/system.py b/tests/system.py index 94672e1..815e20e 100644 --- a/tests/system.py +++ b/tests/system.py @@ -2,7 +2,7 @@ import random import unittest -from linux import system, watchdog +from linux import system class suspend(unittest.TestCase): def test(self, rtc = 'rtc0'):