diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/boards/__init__.py b/boards/__init__.py new file mode 100644 index 0000000..e393f49 --- /dev/null +++ b/boards/__init__.py @@ -0,0 +1,63 @@ +import fnmatch +import importlib +import os.path + +class Vendor: + def __init__(self, name): + self.name = name + self.boards = [] + + def load(self, path): + self.boards[:] + + vendor = os.path.basename(path) + + for path in os.listdir(path): + if fnmatch.fnmatch(path, '*.py'): + + if path == '__init__.py': + continue + + board = os.path.splitext(path)[0] + + name = 'boards.%s.%s' % (vendor, board) + module = importlib.import_module(name) + self.boards.append(module.Board) + + return self.boards + +class UnsupportedBoardException(Exception): + pass + +class Board: + pass + +boards = [] + +''' +Detect the type of board by looking at the compatible string of the device +tree's root node. +''' +def detect(): + with open('/sys/firmware/devicetree/base/compatible', 'r') as file: + line = file.read() + if line: + compatible = line.split('\0')[0] + + for board in boards: + if compatible == board.__compatible__: + return board() + + raise UnsupportedBoardException('Board: %s' % compatible) + + raise IOError + +path = os.path.dirname(__file__) + +for directory in os.listdir(path): + path = os.path.join(path, directory) + + if os.path.exists(os.path.join(path, '__init__.py')): + name = 'boards.%s' % directory + module = importlib.import_module(name) + boards.extend(module.vendor.load(path)) diff --git a/boards/nvidia/__init__.py b/boards/nvidia/__init__.py new file mode 100644 index 0000000..ce299f9 --- /dev/null +++ b/boards/nvidia/__init__.py @@ -0,0 +1,3 @@ +from boards import Vendor + +vendor = Vendor('NVIDIA') diff --git a/boards/nvidia/jetson-tk1.py b/boards/nvidia/jetson-tk1.py new file mode 100644 index 0000000..efd86f1 --- /dev/null +++ b/boards/nvidia/jetson-tk1.py @@ -0,0 +1,14 @@ +import boards +import tegra + +class Board(boards.Board): + __compatible__ = 'nvidia,jetson-tk1' + name = 'NVIDIA Jetson TK1' + + def check_devices_mmc(self): + check_device('/dev/mmcblk0') # eMMC + check_device('/dev/mmcblk1') # MMC/SD + + def check_devices(self): + super().check_devices() + self.check_devices_mmc() diff --git a/boards/nvidia/p2371.py b/boards/nvidia/p2371.py new file mode 100644 index 0000000..f3b1b20 --- /dev/null +++ b/boards/nvidia/p2371.py @@ -0,0 +1,5 @@ +import boards + +class Board(boards.Board): + __compatible__ = 'nvidia,p2371-e01' + name = 'NVIDIA P2371 reference board (E.1)' diff --git a/linux/debugfs.py b/linux/debugfs.py new file mode 100644 index 0000000..c748a11 --- /dev/null +++ b/linux/debugfs.py @@ -0,0 +1,9 @@ +import io, os.path + +mountpoint = '/sys/kernel/debug' + +def exists(path): + return os.path.exists('%s/%s' % (mountpoint, path)) + +def open(path, *args, **kwargs): + return io.open('%s/%s' % (mountpoint, path), *args, **kwargs) diff --git a/linux/drm.py b/linux/drm.py index 9b4b8de..c9ce8e5 100644 --- a/linux/drm.py +++ b/linux/drm.py @@ -38,21 +38,26 @@ def __str__(self): class libdrm(ctypes.CDLL): def __init__(self): - ctypes.CDLL.__init__(self, 'libdrm.so') + ctypes.CDLL.__init__(self, 'libdrm.so.2') self.drmGetVersion.argstype = [ ctypes.c_int ] self.drmGetVersion.restype = drmVersionPtr class DRM: def __init__(self, device): + self.device = device + + def __enter__(self): self.libdrm = libdrm() - if type(device) == str: - self.fd = os.open(device, os.O_RDWR) - elif type(device) == int: - self.fd = device + if type(self.device) == str: + self.fd = os.open(self.device, os.O_RDWR) + elif type(self.device) == int: + self.fd = self.device + + return self - def __del__(self): + def __exit__(self, type, value, traceback): os.close(self.fd) def GetVersion(self): @@ -61,15 +66,7 @@ def GetVersion(self): self.libdrm.drmFreeVersion(version) return result -def open(device): - if type(device) == str: - fd = os.open(device, os.O_RDWR) - elif type(device) == int: - fd = device - - return DRM(fd) - if __name__ == '__main__': - drm = open(sys.argv[1]) - version = drm.GetVersion() - print(version) + with DRM(sys.argv[1]) as drm: + version = drm.GetVersion() + print(version) diff --git a/linux/log.py b/linux/log.py new file mode 100644 index 0000000..d6284e0 --- /dev/null +++ b/linux/log.py @@ -0,0 +1,19 @@ +import sys + +color = True + +def begin(message, end = ''): + print('%s...' % message, end = end) + sys.stdout.flush() + +def end(error = None): + if error: + if color: + print('\033[31mfailed\033[0m:', error) + else: + print('failed:', error) + else: + if color: + print('\033[32mdone\033[0m') + else: + print('done') diff --git a/linux/sysfs.py b/linux/sysfs.py new file mode 100644 index 0000000..311cdce --- /dev/null +++ b/linux/sysfs.py @@ -0,0 +1,16 @@ +import io, os.path + +mountpoint = '/sys' + +def exists(path): + return os.path.exists('%s/%s' % (mountpoint, path)) + +def open(path, *args, **kwargs): + return io.open('%s/%s' % (mountpoint, path), *args, **kwargs) + +class Object: + def __init__(self, path): + self.path = path + + def open(self, path, *args, **kwargs): + return open('%s/%s' % (self.path, path), *args, **kwargs) diff --git a/linux/system.py b/linux/system.py index 3ecc510..642ba18 100644 --- a/linux/system.py +++ b/linux/system.py @@ -1,20 +1,23 @@ #!/usr/bin/python -import io +import ctypes import os import sys +import time + +from . import ioctl, libc, sysfs ''' Represents one CPU present in the system. ''' class CPU(): def __init__(self, num): - self.path = '/sys/devices/system/cpu/cpu%u' % num + self.sysfs = sysfs.Object('devices/system/cpu/cpu%u' % num) self.hotpluggable = True self.num = num - with io.open(os.path.join(self.path, 'online'), 'r') as cpu: - online = cpu.readline().strip() + with self.sysfs.open('online', 'r') as file: + online = file.readline().strip() if online == '0': self.online = False else: @@ -24,13 +27,13 @@ def __init__(self, num): Bring the CPU online or take it offline. ''' def set_online(self, online): - with io.open(os.path.join(self.path, 'online'), 'w') as cpu: + with self.sysfs.open('online', 'w') as file: if online: - cpu.write('1') + file.write('1') else: - cpu.write('0') + file.write('0') - cpu.online = online + self.online = online ''' Return the CPU mask. @@ -52,7 +55,7 @@ def __str__(self): ''' class CPUSet(): def __init__(self): - with io.open('/sys/devices/system/cpu/present', 'r') as file: + with sysfs.open('devices/system/cpu/present', 'r') as file: present = file.readline().rstrip() self.start, self.end = map(int, present.split('-')) @@ -144,6 +147,22 @@ def apply_mask(self, mask): def __iter__(self): return iter(self.cpus) +''' +Provides access to a realtime clock device in the system. +''' +class RTC: + def __init__(self, name = 'rtc0'): + self.sysfs = sysfs.Object('class/rtc/%s' % name) + + ''' + Set the RTC to raise an alarm a given number of seconds from now. + ''' + def set_alarm_relative(self, alarm): + alarm = int(time.time()) + alarm + + with self.sysfs.open('wakealarm', 'w') as file: + file.write('%u' % alarm) + ''' Provides access to the system and system wide controls. ''' @@ -152,5 +171,41 @@ def __init__(self): pass def suspend(self): - with io.open('/sys/power/state', 'w') as file: + with sysfs.open('power/state', 'w') as file: file.write('mem') + +''' +Provides access to a watchdog device in the system. +''' +class Watchdog(): + WDIOC_SETOPTIONS = ioctl.IOR(ord('W'), 4, 4) + WDIOC_SETTIMEOUT = ioctl.IOWR(ord('W'), 6, 4) + + WDIOS_DISABLECARD = 0x0001 + WDIOS_ENABLECARD = 0x0002 + + def __init__(self, path): + self.path = path + + def __enter__(self): + self.fd = os.open(self.path, os.O_RDWR) + + def disable(self): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + + def enable(self): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_ENABLECARD)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + + def set_timeout(self, timeout): + timeout = ctypes.pointer(ctypes.c_uint(timeout)) + + libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) + + def __exit__(self, type, value, traceback): + options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) + libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) + os.close(self.fd) diff --git a/linux/watchdog.py b/linux/watchdog.py deleted file mode 100644 index 9d12cbe..0000000 --- a/linux/watchdog.py +++ /dev/null @@ -1,35 +0,0 @@ -import ctypes -import os - -from . import ioctl -from . import libc - -class Watchdog(): - WDIOC_SETOPTIONS = ioctl.IOR(ord('W'), 4, 4) - WDIOC_SETTIMEOUT = ioctl.IOWR(ord('W'), 6, 4) - - WDIOS_DISABLECARD = 0x0001 - WDIOS_ENABLECARD = 0x0002 - - def __init__(self, path): - self.fd = os.open(path, os.O_RDWR) - - def disable(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - - def enable(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_ENABLECARD)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - - def set_timeout(self, timeout): - timeout = ctypes.pointer(ctypes.c_uint(timeout)) - - libc.ioctl(self.fd, Watchdog.WDIOC_SETTIMEOUT, timeout) - - def __del__(self): - options = ctypes.pointer(ctypes.c_uint(Watchdog.WDIOS_DISABLECARD)) - libc.ioctl(self.fd, Watchdog.WDIOC_SETOPTIONS, options) - os.close(self.fd) diff --git a/tegra-tests.py b/tegra-tests.py new file mode 100644 index 0000000..e7d3d8c --- /dev/null +++ b/tegra-tests.py @@ -0,0 +1,154 @@ +#!/usr/bin/python3 + +import argparse +import boards +import io +import os +import sys +import tegra +import unittest + +from linux import kmsg, log, system + +class TegraTestLoader(unittest.TestLoader): + pass + +#class TegraTestRunner(unittest.TextTestRunner): +# pass + +class WritelnDecorator(object): + def __init__(self, stream): + self.stream = stream + + def __getattr__(self, attr): + if attr in ('stream', '__getstate__'): + raise AttributeError(attr) + + return getattr(self.stream, attr) + + def writeln(self, arg = None): + if arg: + self.write(arg) + + self.write('\n') + +def strclass(cls): + return '%s.%s' % (cls.__module__, cls.__name__) + +class RedirectOutput(io.IOBase): + def __init__(self, stream = sys.stdout): + self.stream = stream + self.newline = True + + def write(self, b): + if self.newline: + self.stream.write('| ') + + self.stream.write(b) + + self.newline = b.endswith('\n') + +class TegraTestResult(unittest.TestResult): + def __init__(self, stream = None, descriptions = True, verbosity = 1): + super().__init__(stream, descriptions, verbosity) + self.stream = WritelnDecorator(stream) + self.verbosity = verbosity + + def startTest(self, test): + self.stdout = sys.stdout + self.stderr = sys.stderr + + if self.verbosity > 0: + log.begin('running: %s (%s)' % (strclass(test.__class__), + test._testMethodName), + end = '\n') + sys.stdout = RedirectOutput(sys.stdout) + sys.stderr = RedirectOutput(sys.stderr) + else: + log.begin('running: %s (%s)' % (strclass(test.__class__), + test._testMethodName)) + sys.stdout = None + sys.stderr = None + + super().startTest(test) + + def stopTest(self, test): + super().stopTest(test) + sys.stderr = self.stderr + sys.stdout = self.stdout + log.end() + +class TegraTestRunner(object): + def __init__(self, stream = None, descriptions = True, verbosity = 1): + if not stream: + stream = sys.stderr + + self.stream = stream + self.descriptions = descriptions + self.verbosity = verbosity + + def run(self, test): + result = TegraTestResult(self.stream, self.descriptions, self.verbosity) + test(result) + return result + +def show_system_info(): + print('System information:') + print('-------------------') + + (opsys, hostname, release, version, machine) = os.uname() + print('OS:', opsys, release, version) + print('Hostname:', hostname) + print('Machine:', machine) + + board = boards.detect() + print('Board:', board.name) + + soc = tegra.detect() + print('SoC:', soc.name) + + cpus = system.CPUSet() + cpus.print() + + print() + +def show_kmsg(): + with kmsg.open('/dev/kmsg') as log: + for entry in log: + print(entry) + +if __name__ == '__main__': + parser = argparse.ArgumentParser('') + parser.add_argument('--batch', '-b', action = 'store_const', const = True) + parser.add_argument('--verbose', '-v', default = 1, action = 'count') + parser.add_argument('--quiet', '-q', action = 'store_const', const = True) + parser.add_argument('tests', type = str, nargs = '*') + args = parser.parse_args(sys.argv[1:]) + + if args.quiet: + args.verbose -= 1 + + if args.verbose > 0: + show_system_info() + #show_kmsg() + + tests = [] + + for module in args.tests: + tests.append('tests.%s' % module) + + loader = TegraTestLoader() + + if args.batch: + runner = unittest.TextTestRunner(verbosity = args.verbose) + log.color = False + else: + runner = TegraTestRunner(verbosity = args.verbose) + log.color = False + + if not tests: + suite = loader.discover('tests', '*.py', '.') + else: + suite = loader.loadTestsFromNames(tests) + + runner.run(suite) diff --git a/tegra.py b/tegra.py deleted file mode 100644 index ec27e2a..0000000 --- a/tegra.py +++ /dev/null @@ -1,227 +0,0 @@ -#!/usr/bin/python3 - -import io -import os -import pyudev -import random -import sys -import time - -from linux import drm -from linux import kmsg -from linux import system -from linux import watchdog - -def check_device(path, indent = 2): - print('%s- %s: ' % (' ' * indent, path), end = '') - - if os.access(path, os.F_OK): - print('OK') - else: - print('failed') - -class Tegra(): - def check_devices(self): - print('- checking for devices:') - - def check(self): - print('%s detected' % self.__description__) - print('running tests:') - self.check_devices() - - def print(self, file = sys.stdout): - print('Board:', self.__description__, file = file) - -class Tegra20(Tegra): - __compatible__ = 'nvidia,tegra20' - -class Tegra30(Tegra): - __compatible__ = 'nvidia,tegra30' - -class Tegra114(Tegra): - __compatible__ = 'nvidia,tegra114' - -class Tegra124(Tegra): - __compatible__ = 'nvidia,tegra124' - -class JetsonTK1(Tegra124): - __compatible__ = 'nvidia,jetson-tk1' - __description__ = 'NVIDIA Jetson TK1' - - def check_devices_mmc(self): - check_device('/dev/mmcblk0') # eMMC - check_device('/dev/mmcblk1') # MMC/SD - - def check_devices(self): - super().check_devices() - self.check_devices_mmc() - -class Tegra132(Tegra): - __compatible__ = 'nvidia,tegra132' - -socs = [ - Tegra20, - Tegra30, - Tegra114, - Tegra124, - Tegra132, - ] - -boards = [ - JetsonTK1, - ] - -def test_show_info(): - print('System information:') - print('-------------------') - - (system, hostname, release, version, machine) = os.uname() - print('OS:', system, release, version) - print('Hostname:', hostname) - print('Machine:', machine) - - board = detect() - board.print() - - cpus = system.CPUSet() - cpus.print() - -def test_kmsg(): - with kmsg.open('/dev/kmsg') as log: - for entry in log: - print(entry) - -''' -Provides access to a realtime clock device in the system. -''' -class RTC: - def __init__(self, name = 'rtc0'): - self.alarm = '/sys/class/rtc/%s/wakealarm' % name - - ''' - Set the RTC to raise an alarm a given number of seconds from now. - ''' - def set_alarm_relative(self, alarm): - alarm = int(time.time()) + alarm - - with io.open(self.alarm, 'w') as rtc: - rtc.write('%u' % alarm) - -def test_suspend(rtc = 'rtc0'): - print('Testing suspend/resume') - - rtc = RTC(rtc) - rtc.set_alarm_relative(10) - - sys = system.System() - sys.suspend() - -def test_cpu_hotplug(): - print('Testing CPU hotplugging') - - cpus = system.CPUSet() - - for cpu in cpus: - print('CPU#%u: mask:' % cpu.num, 1 << cpu.num) - - masks = cpus.generate_masks() - - # go through all combinations once - for mask in masks: - cpus.apply_mask(mask) - - # select random combinations - for i in range(0, 100): - mask = random.choice(masks) - cpus.apply_mask(mask) - - # bring all CPUs online - cpus.online() - -def test_watchdog(): - print('Testing watchdog') - - wdt = watchdog.Watchdog('/dev/watchdog') - wdt.set_timeout(30) - wdt.enable() - -class UnsupportedBoard(Exception): - pass - -''' -Detect the type of board by looking at the compatible string of the device -tree's root node. -''' -def detect(): - with io.open('/sys/firmware/devicetree/base/compatible', 'r') as file: - line = file.read() - if line: - # remove the last, empty element - values = line.split('\0')[:-1] - - name = values[0] - soc = values[-1] - - for board in boards: - if name == board.__compatible__: - return board() - - raise UnsupportedBoardException('SoC: %s, Board: %s' % (soc, name)) - - raise IOError - -def test_board(): - board = detect() - board.check() - - -def test_drm(): - context = pyudev.Context() - devices = context.list_devices(subsystem = 'drm') - - print('DRM devices:') - - for device in devices: - if not device.device_node: - continue - - if 'seat' not in device.tags: - continue - - devno = device.device_number - - print(' %s (%u, %u)' % (device.device_node, os.major(devno), - os.minor(devno))) - - dev = drm.open(device.device_node) - version = dev.GetVersion() - print(' %s (%u.%u.%u, %s, %s)' % (version.name, version.major, - version.minor, version.patch, - version.date, - version.description)) - -if __name__ == '__main__': - if len(sys.argv) > 1: - if sys.argv[1] == 'info': - test_show_info() - - if sys.argv[1] == 'kmsg': - test_kmsg() - - if sys.argv[1] == 'suspend': - test_suspend() - - if sys.argv[1] == 'cpu-hotplug': - test_cpu_hotplug() - - if sys.argv[1] == 'watchdog': - test_watchdog() - - if sys.argv[1] == 'board': - test_board() - - if sys.argv[1] == 'drm': - test_drm() - - else: - test_show_info() diff --git a/tegra/__init__.py b/tegra/__init__.py new file mode 100644 index 0000000..ab4dbb6 --- /dev/null +++ b/tegra/__init__.py @@ -0,0 +1,51 @@ +import importlib +import os.path +import sys + +class UnsupportedSoCException(Exception): + pass + +class SoC: + compatible = 'nvidia,tegra' + name = 'NVIDIA Tegra' + +''' +Detect the type of SoC by looking at the compatible string of the device +tree's root node. +''' +def detect(): + with open('/sys/firmware/devicetree/base/compatible', 'r') as file: + line = file.read() + if line: + # remove the last, empty element + values = line.split('\0')[:-1] + compatible = values[-1] + + for soc in socs: + if compatible == soc.compatible: + return soc() + + raise UnsupportedSoCException('SoC: %s' % (compatible)) + + raise IOError + +# +# initialization +# +socs = [] + +path = os.path.dirname(__file__) +sys.path.append(path) + +paths = sorted(os.listdir(path)) +for path in paths: + if not path.endswith('.py'): + continue + + if path == '__init__.py': + continue + + name = os.path.splitext(path)[0] + module = importlib.import_module(name) + + socs.append(module.SoC) diff --git a/tegra/tegra114.py b/tegra/tegra114.py new file mode 100644 index 0000000..93c454a --- /dev/null +++ b/tegra/tegra114.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra114' + name = 'NVIDIA Tegra114' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra124.py b/tegra/tegra124.py new file mode 100644 index 0000000..2bfe3b7 --- /dev/null +++ b/tegra/tegra124.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra124' + name = 'NVIDIA Tegra124' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra132.py b/tegra/tegra132.py new file mode 100644 index 0000000..a273b8d --- /dev/null +++ b/tegra/tegra132.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra132' + name = 'NVIDIA Tegra132' + + def __init__(self): + self.num_cpus = 2 diff --git a/tegra/tegra20.py b/tegra/tegra20.py new file mode 100644 index 0000000..120a00e --- /dev/null +++ b/tegra/tegra20.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra20' + name = 'NVIDIA Tegra20' + + def __init__(self): + self.num_cpus = 2 diff --git a/tegra/tegra210.py b/tegra/tegra210.py new file mode 100644 index 0000000..c5fe191 --- /dev/null +++ b/tegra/tegra210.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra210' + name = 'NVIDIA Tegra210' + + def __init__(self): + self.num_cpus = 4 diff --git a/tegra/tegra30.py b/tegra/tegra30.py new file mode 100644 index 0000000..43e79d0 --- /dev/null +++ b/tegra/tegra30.py @@ -0,0 +1,8 @@ +import tegra + +class SoC(tegra.SoC): + compatible = 'nvidia,tegra30' + name = 'NVIDIA Tegra30' + + def __init__(self): + self.num_cpus = 4 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cpufreq.py b/tests/cpufreq.py new file mode 100644 index 0000000..6bd1bcf --- /dev/null +++ b/tests/cpufreq.py @@ -0,0 +1,126 @@ +#!/usr/bin/python3 + +import unittest + +from linux import log, sysfs, system + +def cpufreq_supported(): + return sysfs.exists('devices/system/cpu/cpu0/cpufreq') + +class CPU: + def __init__(self, num): + self.sysfs = sysfs.Object('devices/system/cpu/cpu%u/cpufreq' % num) + self.supported_governors = [] + self.supported_rates = [] + + with self.sysfs.open('scaling_available_governors', 'r') as f: + for line in f: + self.supported_governors.extend(line.split()) + + with self.sysfs.open('scaling_available_frequencies', 'r') as f: + for line in f: + self.supported_rates.extend(line.split()) + + def __getattr__(self, name): + if name == 'governor': + with self.sysfs.open('scaling_governor') as file: + return file.read().strip() + + if name == 'rate': + with self.sysfs.open('scaling_cur_freq') as file: + return file.read().strip() + + return super.__getattr__(self, name) + + def __setattr__(self, name, value): + if name == 'governor': + with self.sysfs.open('scaling_governor', 'w') as file: + file.write(value) + + return + + if name == 'rate': + with self.sysfs.open('scaling_setspeed', 'w') as file: + file.write(value) + + with self.sysfs.open('scaling_cur_freq', 'r') as file: + rate = file.read().strip() + + if rate != value: + raise Exception + + return + + return super.__setattr__(self, name, value) + +@unittest.skipUnless(cpufreq_supported(), 'CPUfreq not supported') +class cpufreq(unittest.TestCase): + def test_list_governors(self): + cpuset = system.CPUSet() + + for cpu in cpuset: + print('- CPU#%u:' % cpu.num) + + cpu = CPU(cpu.num) + + print(' - supported governors:') + + for governor in cpu.supported_governors: + if governor == cpu.governor: + print(' - %s *' % governor) + else: + print(' - %s' % governor) + + def test_list_rates(self): + cpuset = system.CPUSet() + + for cpu in cpuset: + print('- CPU#%u:' % cpu.num) + cpu = CPU(cpu.num) + + width = max([len(x) for x in cpu.supported_rates]) + + print(' - supported rates:') + + for rate in cpu.supported_rates: + if rate == cpu.rate: + print(' - %*s' % (width, rate), '*') + else: + print(' - %*s' % (width, rate)) + + def test_set_rates(self): + cpuset = system.CPUSet() + + for cpu in cpuset: + print('- CPU#%u' % cpu.num) + cpu = CPU(cpu.num) + + log.begin(' - switching to userspace governor') + governor = cpu.governor + + try: + cpu.governor = 'userspace' + except Exception as e: + log.end(e) + continue + else: + log.end() + + for rate in cpu.supported_rates: + log.begin(' - setting rate %s' % rate) + + try: + cpu.rate = rate + except Exception as e: + log.end(e) + else: + log.end() + + log.begin(' - restoring %s governor' % governor) + + try: + cpu.governor = governor + except Exception as e: + log.end(e) + else: + log.end() diff --git a/tests/drm.py b/tests/drm.py new file mode 100644 index 0000000..3dcd26b --- /dev/null +++ b/tests/drm.py @@ -0,0 +1,47 @@ +#!/usr/bin/python3 + +import os +import pyudev +import sys +import unittest +import subprocess + +import linux.drm + +class drm(unittest.TestCase): + def test_list_devices(self): + context = pyudev.Context() + devices = context.list_devices(subsystem = 'drm') + + print('DRM devices:') + + for device in devices: + if not device.device_node: + continue + + if 'seat' not in device.tags: + continue + + devno = device.device_number + + print(' %s (%u, %u)' % (device.device_node, os.major(devno), + os.minor(devno))) + + with linux.drm.DRM(device.device_node) as dev: + version = dev.GetVersion() + print(' Driver:', version.name) + print(' Description:', version.description) + print(' Version: %u.%u.%u (%s)' % (version.major, + version.minor, + version.patch, + version.date)) + +submit_vic_path = '/root/drm/tests/tegra/submit_vic' +def submit_vic_exists(): + return os.path.exists(submit_vic_path) + +@unittest.skipUnless(submit_vic_exists(), 'submit_vic DRM test not present') +class submitvic(unittest.TestCase): + def test(self): + subprocess.check_call(submit_vic_path, stdout = subprocess.DEVNULL, + stderr = subprocess.DEVNULL) diff --git a/tests/emc.py b/tests/emc.py new file mode 100644 index 0000000..8a85b2b --- /dev/null +++ b/tests/emc.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +import unittest + +from linux import debugfs, log + +def emc_supported(): + return debugfs.exists('emc') + +def emc_set_rate(rate): + with debugfs.open('emc/rate', 'w') as f: + f.write(rate) + + with debugfs.open('emc/rate', 'r') as f: + rate = f.read().strip(); + + return rate + +@unittest.skipUnless(emc_supported(), 'EMC frequency scaling not supported') +class emc(unittest.TestCase): + def test_set_rates(self): + supported_rates = [] + errors = 0 + + print('EMC frequency scaling:') + print('======================') + + with debugfs.open('emc/supported_rates') as f: + for line in f: + supported_rates.extend(line.split()) + + width = max([len(x) for x in supported_rates]) + + with debugfs.open('emc/rate') as f: + current = f.read().strip() + + print('- supported rates: (* = current)'); + + for rate in supported_rates: + if rate == current: + print(' - %*s' % (width, rate), '*') + else: + print(' - %*s' % (width, rate)) + + print('- testing:') + + for rate in supported_rates: + log.begin(' - %*s...' % (width, rate)) + + actual = emc_set_rate(rate) + if actual != rate: + log.end('reported %s' % actual) + errors += 1 + else: + log.end() + + log.begin('- resetting to old rate: %s...' % current) + + actual = emc_set_rate(current) + if actual != current: + log.end('reported %s' % actual) + else: + log.end() + + self.longMessage = False + self.assertEqual(errors, 0, 'not all rate changes succeeded') diff --git a/tests/system.py b/tests/system.py new file mode 100644 index 0000000..815e20e --- /dev/null +++ b/tests/system.py @@ -0,0 +1,50 @@ +import os.path +import random +import unittest + +from linux import system + +class suspend(unittest.TestCase): + def test(self, rtc = 'rtc0'): + print('Testing suspend/resume') + + rtc = system.RTC(rtc) + rtc.set_alarm_relative(2) + + sys = system.System() + sys.suspend() + +class cpuhotplug(unittest.TestCase): + def test(self): + print('Testing CPU hotplugging') + + cpus = system.CPUSet() + + for cpu in cpus: + print('CPU#%u: mask:' % cpu.num, 1 << cpu.num) + + masks = cpus.generate_masks() + + # go through all combinations once + for mask in masks: + cpus.apply_mask(mask) + + # select random combinations + for i in range(0, 100): + mask = random.choice(masks) + cpus.apply_mask(mask) + + # bring all CPUs online + cpus.online() + +def watchdog_supported(): + return os.path.exists('/dev/watchdog') + +@unittest.skipUnless(watchdog_supported(), 'Watchdog not supported') +class watchdog(unittest.TestCase): + def test(self): + print('Testing watchdog') + + with system.Watchdog('/dev/watchdog') as wdt: + wdt.set_timeout(30) + wdt.enable()