diff --git a/.github/workflows/install.yaml b/.github/workflows/install.yaml index 7d965b2..e879179 100644 --- a/.github/workflows/install.yaml +++ b/.github/workflows/install.yaml @@ -11,15 +11,15 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] include: - os: ubuntu-latest - python-version: "3.7" + python-version: "3.9" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..20d254b --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,27 @@ +name: Lint + +on: + - push + - pull_request + +jobs: + pylint: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.14"] + + steps: + - uses: actions/checkout@v6 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Check for pylint errors + run: | + python -m pip install pylint setuptools + python setup.py build + python -m pylint --verbose -E build/lib*/evdev diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..073d524 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: Test + +on: + - push + - pull_request + +jobs: + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.14"] + + steps: + - uses: actions/checkout@v6 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Run pytest tests + # sudo required to write to uinputs + run: | + sudo python -m pip install pytest setuptools + sudo python -m pip install -e . + sudo python -m pytest tests diff --git a/.gitignore b/.gitignore index 329a06d..70ac303 100644 --- a/.gitignore +++ b/.gitignore @@ -15,11 +15,14 @@ TAGS .#* __pycache__ .pytest_cache +.ruff_cache +.venv +uv.lock -evdev/*.so -evdev/ecodes.c -evdev/iprops.c +src/evdev/*.so +src/evdev/ecodes.c +src/evdev/ecodes.pyi docs/_build -evdev/_ecodes.py -evdev/_input.py -evdev/_uinput.py +src/evdev/_ecodes.py +src/evdev/_input.py +src/evdev/_uinput.py diff --git a/LICENSE b/LICENSE index 5600871..8482b07 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2012-2023 Georgi Valkov. All rights reserved. +Copyright (c) 2012-2025 Georgi Valkov. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are diff --git a/MANIFEST.in b/MANIFEST.in index 435d617..be2be3d 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ # The _ecodes extension module source file needs to be generated against the # evdev headers of the running kernel. Refer to the 'build_ecodes' distutils # command in setup.py. -exclude evdev/ecodes.c +exclude src/evdev/ecodes.c +include src/evdev/ecodes.py diff --git a/docs/changelog.rst b/docs/changelog.rst index c14026a..bcf1636 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,12 +1,69 @@ Changelog --------- -1.8.0 (Unreleased) -================== +1.9.3 (Feb 05, 2025) +==================== + +- Fix several memory leaks in ``input.c``. + +- Raise the minimum supported Python version to 3.9 and the setuptools version to 77.0. + + +1.9.2 (May 01, 2025) +==================== + +- Add the "--reproducible" build option which removes the build date and used headers from the + generated ``ecodes.c``. Example usage:: + + python -m build --config-setting=--build-option='build_ecodes --reproducible' -n -- Binary wheels are now provided by the `evdev-binary http://pypi.python.org/pypi/evdev-binary`_ package. +- Use ``Generic`` to set precise type for ``InputDevice.path``. + + +1.9.1 (Feb 22, 2025) +==================== + +- Fix fox missing ``UI_FF`` constants in generated ``ecodes.py``. + +- More type annotations. + + +1.9.0 (Feb 08, 2025) +==================== + +- Fix for ``CPATH/C_INCLUDE_PATH`` being ignored during build. + +- Slightly faster reading of events in ``device.read()`` and ``device.read_one()``. + +- Fix FreeBSD support. + +- Drop deprecated ``InputDevice.fn`` (use ``InputDevice.path`` instead). + +- Improve type hint coverage and add a ``py.typed`` file to the sdist. + + +1.8.0 (Jan 25, 2025) +==================== + +- Binary wheels are now provided by the `evdev-binary `_ package. The package is compiled on manylinux_2_28 against kernel 4.18. +- The ``evdev.ecodes`` module is now generated at install time and contains only constants. This allows type + checking and introspection of the ``evdev.ecodes`` module, without having to execute it first. The old + module is available as ``evdev.ecodes_runtime``. In case generation of the static ``ecodes.py`` fails, the + install process falls back to using ``ecodes_runtime.py`` as ``ecodes.py``. + +- Reverse mappings in ``evdev.ecodes`` that point to more than one value are now tuples instead of lists. For example:: + + >>> ecodes.KEY[153] + ('KEY_DIRECTION', 'KEY_ROTATE_DISPLAY') + +- Raise the minimum supported Python version to 3.8. + +- Fix keyboard delay and repeat being swapped (#227). + +- Move the ``syn()`` convenience method from ``InputDevice`` to ``EventIO`` (#224). + 1.7.1 (May 8, 2024) ==================== @@ -25,7 +82,7 @@ Changelog - Add the uniq address to the string representation of ``InputDevice``. -- Improved method for finding the device node corresponding to a uinput device (`#206 https://github.com/gvalkov/python-evdev/pull/206`_). +- Improved method for finding the device node corresponding to a uinput device (`#206 `_). - Repository TLC (reformatted with ruff, fixed linting warnings, moved packaging metadata to ``pyproject.toml`` etc.). diff --git a/docs/conf.py b/docs/conf.py index bf03b42..0be06b3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import os import sys import sphinx_rtd_theme @@ -13,7 +11,7 @@ # Trick autodoc into running without having built the extension modules. if on_rtd: - with open("../evdev/_ecodes.py", "w") as fh: + with open("../src/evdev/_ecodes.py", "w") as fh: fh.write( """ KEY = ABS = REL = SW = MSC = LED = REP = SND = SYN = FF = FF_STATUS = BTN_A = KEY_A = 1 @@ -22,9 +20,9 @@ KEY_MAX, KEY_CNT = 1, 2""" ) - with open("../evdev/_input.py", "w"): + with open("../src/evdev/_input.py", "w"): pass - with open("../evdev/_uinput.py", "w"): + with open("../src/evdev/_uinput.py", "w"): pass @@ -60,14 +58,14 @@ # General information about the project. project = "python-evdev" -copyright = "2012-2024, Georgi Valkov and contributors" +copyright = "2012-2025, Georgi Valkov and contributors" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The full version, including alpha/beta/rc tags. -release = "1.7.1" +release = "1.9.3" # The short X.Y version. version = release diff --git a/evdev/__init__.py b/evdev/__init__.py deleted file mode 100644 index 36b330c..0000000 --- a/evdev/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# -------------------------------------------------------------------------- -# Gather everything into a single, convenient namespace. -# -------------------------------------------------------------------------- - -from evdev.device import DeviceInfo, InputDevice, AbsInfo, EvdevError -from evdev.events import InputEvent, KeyEvent, RelEvent, SynEvent, AbsEvent, event_factory -from evdev.uinput import UInput, UInputError -from evdev.util import list_devices, categorize, resolve_ecodes, resolve_ecodes_dict -from evdev import ecodes -from evdev import ff diff --git a/evdev/genecodes.py b/evdev/genecodes.py deleted file mode 100644 index ce9939e..0000000 --- a/evdev/genecodes.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -Generate a Python extension module with the constants defined in linux/input.h. -""" - -import os, sys, re - - -# ----------------------------------------------------------------------------- -# The default header file locations to try. -headers = [ - "/usr/include/linux/input.h", - "/usr/include/linux/input-event-codes.h", - "/usr/include/linux/uinput.h", -] - -if sys.argv[1:]: - headers = sys.argv[1:] - - -# ----------------------------------------------------------------------------- -macro_regex = r"#define +((?:KEY|ABS|REL|SW|MSC|LED|BTN|REP|SND|ID|EV|BUS|SYN|FF|UI_FF|INPUT_PROP)_\w+)" -macro_regex = re.compile(macro_regex) - -# Uname without hostname. -uname = list(os.uname()) -uname = " ".join((uname[0], *uname[2:])) - - -# ----------------------------------------------------------------------------- -template = r""" -#include -#ifdef __FreeBSD__ -#include -#else -#include -#include -#endif - -/* Automatically generated by evdev.genecodes */ -/* Generated on %s */ - -#define MODULE_NAME "_ecodes" -#define MODULE_HELP "linux/input.h macros" - -static PyMethodDef MethodTable[] = { - { NULL, NULL, 0, NULL} -}; - -static struct PyModuleDef moduledef = { - PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, - -1, /* m_size */ - MethodTable, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ -}; - -PyMODINIT_FUNC -PyInit__ecodes(void) -{ - PyObject* m = PyModule_Create(&moduledef); - if (m == NULL) return NULL; - -%s - - return m; -} -""" - - -def parse_header(header): - for line in open(header): - macro = macro_regex.search(line) - if macro: - yield " PyModule_AddIntMacro(m, %s);" % macro.group(1) - - -all_macros = [] -for header in headers: - try: - fh = open(header) - except (IOError, OSError): - continue - all_macros += parse_header(header) - -if not all_macros: - print("no input macros found in: %s" % " ".join(headers), file=sys.stderr) - sys.exit(1) - - -macros = os.linesep.join(all_macros) -print(template % (uname, macros)) diff --git a/pyproject.toml b/pyproject.toml index 9ba60ff..d0b4f7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,15 +1,15 @@ [build-system] -requires = ["setuptools>=61.0"] +requires = ["setuptools>=77.0"] build-backend = "setuptools.build_meta" [project] name = "evdev" -version = "1.7.1" +version = "1.9.3" description = "Bindings to the Linux input handling subsystem" keywords = ["evdev", "input", "uinput"] readme = "README.md" -license = {file = "LICENSE"} -requires-python = ">=3.6" +license = "BSD-3-Clause" +requires-python = ">=3.9" authors = [ { name="Georgi Valkov", email="georgi.t.valkov@gmail.com" }, ] @@ -22,16 +22,12 @@ classifiers = [ "Operating System :: POSIX :: Linux", "Intended Audience :: Developers", "Topic :: Software Development :: Libraries", - "License :: OSI Approved :: BSD License", "Programming Language :: Python :: Implementation :: CPython", ] [project.urls] "Homepage" = "https://github.com/gvalkov/python-evdev" -[tool.setuptools] -packages = ["evdev"] - [tool.ruff] line-length = 120 @@ -39,7 +35,7 @@ line-length = 120 ignore = ["E265", "E241", "F403", "F401", "E401", "E731"] [tool.bumpversion] -current_version = "1.7.1" +current_version = "1.9.3" commit = true tag = true allow_dirty = true @@ -49,3 +45,12 @@ filename = "pyproject.toml" [[tool.bumpversion.files]] filename = "docs/conf.py" + +[tool.pylint.'MESSAGES CONTROL'] +disable = """ + no-member, +""" + +[tool.pylint.typecheck] +generated-members = ["evdev.ecodes.*"] +ignored-modules= ["evdev._*"] diff --git a/requirements-dev.txt b/requirements-dev.txt index 96366e6..725ad7f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,3 +7,4 @@ bump-my-version ~= 0.17.4 build twine cibuildwheel +setuptools diff --git a/setup.py b/setup.py index 6781527..1f6eaac 100755 --- a/setup.py +++ b/setup.py @@ -1,17 +1,20 @@ import os import sys +import shutil import textwrap +import platform from pathlib import Path +from subprocess import run from setuptools import setup, Extension, Command from setuptools.command import build_ext as _build_ext curdir = Path(__file__).resolve().parent -ecodes_path = curdir / "evdev/ecodes.c" +ecodes_c_path = curdir / "src/evdev/ecodes.c" -def create_ecodes(headers=None): +def create_ecodes(headers=None, reproducible=False): if not headers: include_paths = set() cpath = os.environ.get("CPATH", "").strip() @@ -23,7 +26,11 @@ def create_ecodes(headers=None): include_paths.update(c_inc_path.split(":")) include_paths.add("/usr/include") - files = ["linux/input.h", "linux/input-event-codes.h", "linux/uinput.h"] + if platform.system().lower() == "freebsd": + files = ["dev/evdev/input.h", "dev/evdev/input-event-codes.h", "dev/evdev/uinput.h"] + else: + files = ["linux/input.h", "linux/input-event-codes.h", "linux/uinput.h"] + headers = [os.path.join(path, file) for path in include_paths for file in files] headers = [header for header in headers if os.path.isfile(header)] @@ -48,7 +55,7 @@ def create_ecodes(headers=None): build_ext --include-dirs path/ \\ install - If you prefer to avoid building this package from source, then please consider + If you want to avoid building this package from source, then please consider installing the `evdev-binary` package instead. Keep in mind that it may not be fully compatible with, or support all the features of your current kernel. """ @@ -56,11 +63,12 @@ def create_ecodes(headers=None): sys.stderr.write(textwrap.dedent(msg)) sys.exit(1) - from subprocess import run - - print("writing %s (using %s)" % (ecodes_path, " ".join(headers))) - with ecodes_path.open("w") as fh: - cmd = [sys.executable, "evdev/genecodes.py", *headers] + print("writing %s (using %s)" % (ecodes_c_path, " ".join(headers))) + with ecodes_c_path.open("w") as fh: + cmd = [sys.executable, "src/evdev/genecodes_c.py"] + if reproducible: + cmd.append("--reproducible") + cmd.extend(["--ecodes", *headers]) run(cmd, check=True, stdout=fh) @@ -69,29 +77,46 @@ class build_ecodes(Command): user_options = [ ("evdev-headers=", None, "colon-separated paths to input subsystem headers"), + ("reproducible", None, "hide host details (host/paths) to create a reproducible output"), ] def initialize_options(self): self.evdev_headers = None + self.reproducible = False def finalize_options(self): if self.evdev_headers: self.evdev_headers = self.evdev_headers.split(":") + if self.reproducible is None: + self.reproducible = False def run(self): - create_ecodes(self.evdev_headers) + create_ecodes(self.evdev_headers, reproducible=self.reproducible) class build_ext(_build_ext.build_ext): def has_ecodes(self): - if ecodes_path.exists(): + if ecodes_c_path.exists(): print("ecodes.c already exists ... skipping build_ecodes") - return not ecodes_path.exists() + return False + return True + + def generate_ecodes_py(self): + ecodes_py = Path(self.build_lib) / "evdev/ecodes.py" + print(f"writing {ecodes_py}") + with ecodes_py.open("w") as fh: + cmd = [sys.executable, "-B", "src/evdev/genecodes_py.py"] + res = run(cmd, env={"PYTHONPATH": self.build_lib}, stdout=fh) + + if res.returncode != 0: + print(f"failed to generate static {ecodes_py} - will use ecodes_runtime.py") + shutil.copy("src/evdev/ecodes_runtime.py", ecodes_py) def run(self): for cmd_name in self.get_sub_commands(): self.run_command(cmd_name) _build_ext.build_ext.run(self) + self.generate_ecodes_py() sub_commands = [("build_ecodes", has_ecodes)] + _build_ext.build_ext.sub_commands @@ -99,9 +124,9 @@ def run(self): cflags = ["-std=c99", "-Wno-error=declaration-after-statement"] setup( ext_modules=[ - Extension("evdev._input", sources=["evdev/input.c"], extra_compile_args=cflags), - Extension("evdev._uinput", sources=["evdev/uinput.c"], extra_compile_args=cflags), - Extension("evdev._ecodes", sources=["evdev/ecodes.c"], extra_compile_args=cflags), + Extension("evdev._input", sources=["src/evdev/input.c"], extra_compile_args=cflags), + Extension("evdev._uinput", sources=["src/evdev/uinput.c"], extra_compile_args=cflags), + Extension("evdev._ecodes", sources=["src/evdev/ecodes.c"], extra_compile_args=cflags), ], cmdclass={ "build_ext": build_ext, diff --git a/src/evdev/__init__.py b/src/evdev/__init__.py new file mode 100644 index 0000000..bae0fec --- /dev/null +++ b/src/evdev/__init__.py @@ -0,0 +1,39 @@ +# -------------------------------------------------------------------------- +# Gather everything into a single, convenient namespace. +# -------------------------------------------------------------------------- + +# The superfluous "import name as name" syntax is here to satisfy mypy's attrs-defined rule. +# Alternatively all exported objects can be listed in __all__. + +from . import ( + ecodes as ecodes, + ff as ff, +) + +from .device import ( + AbsInfo as AbsInfo, + DeviceInfo as DeviceInfo, + EvdevError as EvdevError, + InputDevice as InputDevice, +) + +from .events import ( + AbsEvent as AbsEvent, + InputEvent as InputEvent, + KeyEvent as KeyEvent, + RelEvent as RelEvent, + SynEvent as SynEvent, + event_factory as event_factory, +) + +from .uinput import ( + UInput as UInput, + UInputError as UInputError, +) + +from .util import ( + categorize as categorize, + list_devices as list_devices, + resolve_ecodes as resolve_ecodes, + resolve_ecodes_dict as resolve_ecodes_dict, +) diff --git a/evdev/device.py b/src/evdev/device.py similarity index 80% rename from evdev/device.py rename to src/evdev/device.py index cde168e..a7f9b92 100644 --- a/evdev/device.py +++ b/src/evdev/device.py @@ -1,31 +1,21 @@ -# encoding: utf-8 - -import os -import warnings import contextlib -import collections +import os +from typing import Dict, Generic, Iterator, List, Literal, NamedTuple, Tuple, TypeVar, Union, overload -from evdev import _input, ecodes, util -from evdev.events import InputEvent +from . import _input, ecodes, util try: - from evdev.eventio_async import EventIO, EvdevError + from .eventio_async import EvdevError, EventIO except ImportError: - from evdev.eventio import EventIO, EvdevError - - -# -------------------------------------------------------------------------- -_AbsInfo = collections.namedtuple("AbsInfo", ["value", "min", "max", "fuzz", "flat", "resolution"]) + from .eventio import EvdevError, EventIO -_KbdInfo = collections.namedtuple("KbdInfo", ["repeat", "delay"]) +_AnyStr = TypeVar("_AnyStr", str, bytes) -_DeviceInfo = collections.namedtuple("DeviceInfo", ["bustype", "vendor", "product", "version"]) - -class AbsInfo(_AbsInfo): +class AbsInfo(NamedTuple): """Absolute axis information. - A ``namedtuple`` used for storing absolute axis information - + A ``namedtuple`` with absolute axis information - corresponds to the ``input_absinfo`` struct: Attributes @@ -61,28 +51,38 @@ class AbsInfo(_AbsInfo): """ + value: int + min: int + max: int + fuzz: int + flat: int + resolution: int + def __str__(self): - return "val {}, min {}, max {}, fuzz {}, flat {}, res {}".format(*self) + return "value {}, min {}, max {}, fuzz {}, flat {}, res {}".format(*self) # pylint: disable=not-an-iterable -class KbdInfo(_KbdInfo): +class KbdInfo(NamedTuple): """Keyboard repeat rate. Attributes ---------- - repeat - Keyboard repeat rate in characters per second. - delay Amount of time that a key must be depressed before it will start to repeat (in milliseconds). + + repeat + Keyboard repeat rate in characters per second. """ + delay: int + repeat: int + def __str__(self): - return "repeat {}, delay {}".format(*self) + return "delay {}, repeat {}".format(self.delay, self.repeat) -class DeviceInfo(_DeviceInfo): +class DeviceInfo(NamedTuple): """ Attributes ---------- @@ -92,19 +92,24 @@ class DeviceInfo(_DeviceInfo): version """ - def __str__(self): + bustype: int + vendor: int + product: int + version: int + + def __str__(self) -> str: msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}" - return msg.format(*self) + return msg.format(*self) # pylint: disable=not-an-iterable -class InputDevice(EventIO): +class InputDevice(EventIO, Generic[_AnyStr]): """ A linux input device from which input events can be read. """ __slots__ = ("path", "fd", "info", "name", "phys", "uniq", "_rawcapabilities", "version", "ff_effects_count") - def __init__(self, dev): + def __init__(self, dev: Union[_AnyStr, "os.PathLike[_AnyStr]"]): """ Arguments --------- @@ -113,17 +118,16 @@ def __init__(self, dev): """ #: Path to input device. - self.path = dev if not hasattr(dev, "__fspath__") else dev.__fspath__() + self.path: _AnyStr = dev if not hasattr(dev, "__fspath__") else dev.__fspath__() - # Certain operations are possible only when the device is opened in - # read-write mode. + # Certain operations are possible only when the device is opened in read-write mode. try: fd = os.open(dev, os.O_RDWR | os.O_NONBLOCK) except OSError: fd = os.open(dev, os.O_RDONLY | os.O_NONBLOCK) #: A non-blocking file descriptor to the device file. - self.fd = fd + self.fd: int = fd # Returns (bustype, vendor, product, version, name, phys, capabilities). info_res = _input.ioctl_devinfo(self.fd) @@ -132,16 +136,16 @@ def __init__(self, dev): self.info = DeviceInfo(*info_res[:4]) #: The name of the event device. - self.name = info_res[4] + self.name: str = info_res[4] #: The physical topology of the device. - self.phys = info_res[5] + self.phys: str = info_res[5] #: The unique identifier of the device. - self.uniq = info_res[6] + self.uniq: str = info_res[6] #: The evdev protocol version. - self.version = _input.ioctl_EVIOCGVERSION(self.fd) + self.version: int = _input.ioctl_EVIOCGVERSION(self.fd) #: The raw dictionary of device capabilities - see `:func:capabilities()`. self._rawcapabilities = _input.ioctl_capabilities(self.fd) @@ -149,14 +153,14 @@ def __init__(self, dev): #: The number of force feedback effects the device can keep in its memory. self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd) - def __del__(self): + def __del__(self) -> None: if hasattr(self, "fd") and self.fd is not None: try: self.close() except (OSError, ImportError, AttributeError): pass - def _capabilities(self, absinfo=True): + def _capabilities(self, absinfo: bool = True): res = {} for etype, _ecodes in self._rawcapabilities.items(): @@ -174,7 +178,13 @@ def _capabilities(self, absinfo=True): return res - def capabilities(self, verbose=False, absinfo=True): + @overload + def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]: + ... + @overload + def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]: + ... + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]: """ Return the event types that this device supports as a mapping of supported event types to lists of handled event codes. @@ -219,7 +229,7 @@ def capabilities(self, verbose=False, absinfo=True): else: return self._capabilities(absinfo) - def input_props(self, verbose=False): + def input_props(self, verbose: bool = False): """ Get device properties and quirks. @@ -240,7 +250,7 @@ def input_props(self, verbose=False): return props - def leds(self, verbose=False): + def leds(self, verbose: bool = False): """ Return currently set LED keys. @@ -261,7 +271,7 @@ def leds(self, verbose=False): return leds - def set_led(self, led_num, value): + def set_led(self, led_num: int, value: int) -> None: """ Set the state of the selected LED. @@ -277,18 +287,18 @@ def __eq__(self, other): """ return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path - def __str__(self): + def __str__(self) -> str: msg = 'device {}, name "{}", phys "{}", uniq "{}"' return msg.format(self.path, self.name, self.phys, self.uniq or "") - def __repr__(self): + def __repr__(self) -> str: msg = (self.__class__.__name__, self.path) return "{}({!r})".format(*msg) def __fspath__(self): return self.path - def close(self): + def close(self) -> None: if self.fd > -1: try: super().close() @@ -296,7 +306,7 @@ def close(self): finally: self.fd = -1 - def grab(self): + def grab(self) -> None: """ Grab input device using ``EVIOCGRAB`` - other applications will be unable to receive events until the device is released. Only @@ -309,7 +319,7 @@ def grab(self): _input.ioctl_EVIOCGRAB(self.fd, 1) - def ungrab(self): + def ungrab(self) -> None: """ Release device if it has been already grabbed (uses `EVIOCGRAB`). @@ -322,7 +332,7 @@ def ungrab(self): _input.ioctl_EVIOCGRAB(self.fd, 0) @contextlib.contextmanager - def grab_context(self): + def grab_context(self) -> Iterator[None]: """ A context manager for the duration of which only the current process will be able to receive events from the device. @@ -331,7 +341,7 @@ def grab_context(self): yield self.ungrab() - def upload_effect(self, effect): + def upload_effect(self, effect: "ff.Effect"): """ Upload a force feedback effect to a force feedback device. """ @@ -340,7 +350,7 @@ def upload_effect(self, effect): ff_id = _input.upload_effect(self.fd, data) return ff_id - def erase_effect(self, ff_id): + def erase_effect(self, ff_id) -> None: """ Erase a force effect from a force feedback device. This also stops the effect. @@ -358,10 +368,10 @@ def repeat(self): return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd)) @repeat.setter - def repeat(self, value): + def repeat(self, value: Tuple[int, int]): return _input.ioctl_EVIOCSREP(self.fd, *value) - def active_keys(self, verbose=False): + def active_keys(self, verbose: bool = False): """ Return currently active keys. @@ -384,13 +394,7 @@ def active_keys(self, verbose=False): return active_keys - @property - def fn(self): - msg = "Please use {0}.path instead of {0}.fn".format(self.__class__.__name__) - warnings.warn(msg, DeprecationWarning, stacklevel=2) - return self.path - - def absinfo(self, axis_num): + def absinfo(self, axis_num: int): """ Return current :class:`AbsInfo` for input device axis @@ -406,7 +410,7 @@ def absinfo(self, axis_num): """ return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num)) - def set_absinfo(self, axis_num, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None): + def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None: """ Update :class:`AbsInfo` values. Only specified values will be overwritten. diff --git a/src/evdev/ecodes.py b/src/evdev/ecodes.py new file mode 100644 index 0000000..fd4afc4 --- /dev/null +++ b/src/evdev/ecodes.py @@ -0,0 +1,5 @@ +# When installed, this module is replaced by an ecodes.py generated at +# build time by genecodes_py.py (see build_ext in setup.py). + +# This stub exists to make development of evdev itself more convenient. +from .ecodes_runtime import * diff --git a/evdev/ecodes.py b/src/evdev/ecodes_runtime.py similarity index 85% rename from evdev/ecodes.py rename to src/evdev/ecodes_runtime.py index 3562368..47f3b23 100644 --- a/evdev/ecodes.py +++ b/src/evdev/ecodes_runtime.py @@ -1,3 +1,4 @@ +# pylint: disable=undefined-variable """ This modules exposes the integer constants defined in ``linux/input.h`` and ``linux/input-event-codes.h``. @@ -32,26 +33,26 @@ codes. For example:: >>> evdev.ecodes.FF[80] - ['FF_EFFECT_MIN', 'FF_RUMBLE'] + ('FF_EFFECT_MIN', 'FF_RUMBLE') >>> evdev.ecodes.FF[81] 'FF_PERIODIC' """ from inspect import getmembers -from evdev import _ecodes +from . import _ecodes #: Mapping of names to values. ecodes = {} -prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF INPUT_PROP" +prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF INPUT_PROP UI_FF".split() prev_prefix = "" g = globals() # eg. code: 'REL_Z', val: 2 for code, val in getmembers(_ecodes): - for prefix in prefixes.split(): # eg. 'REL' + for prefix in prefixes: # eg. 'REL' if code.startswith(prefix): ecodes[code] = val # FF_STATUS codes should not appear in the FF reverse mapping @@ -70,6 +71,15 @@ prev_prefix = prefix + +# Convert lists to tuples. +k, v = None, None +for prefix in prefixes: + for k, v in g[prefix].items(): + if isinstance(v, list): + g[prefix][k] = tuple(v) + + #: Keys are a combination of all BTN and KEY codes. keys = {} keys.update(BTN) @@ -98,4 +108,4 @@ from evdev._ecodes import * # cheaper than whitelisting in an __all__ -del code, val, prefix, getmembers, g, d, prefixes, prev_prefix +del code, val, prefix, getmembers, g, d, k, v, prefixes, prev_prefix diff --git a/evdev/eventio.py b/src/evdev/eventio.py similarity index 89% rename from evdev/eventio.py rename to src/evdev/eventio.py index 415e2e8..bdb91a4 100644 --- a/evdev/eventio.py +++ b/src/evdev/eventio.py @@ -1,10 +1,11 @@ -import os import fcntl -import select import functools +import os +import select +from typing import Iterator, Union -from evdev import _input, _uinput, ecodes, util -from evdev.events import InputEvent +from . import _input, _uinput, ecodes +from .events import InputEvent # -------------------------------------------------------------------------- @@ -35,7 +36,7 @@ def fileno(self): """ return self.fd - def read_loop(self): + def read_loop(self) -> Iterator[InputEvent]: """ Enter an endless :func:`select.select()` loop that yields input events. """ @@ -45,7 +46,7 @@ def read_loop(self): for event in self.read(): yield event - def read_one(self): + def read_one(self) -> Union[InputEvent, None]: """ Read and return a single input event as an instance of :class:`InputEvent `. @@ -59,19 +60,20 @@ def read_one(self): if event: return InputEvent(*event) - def read(self): + def read(self) -> Iterator[InputEvent]: """ Read multiple input events from device. Return a generator object that yields :class:`InputEvent ` instances. Raises `BlockingIOError` if there are no available events at the moment. """ - # events -> [(sec, usec, type, code, val), ...] + # events -> ((sec, usec, type, code, val), ...) events = _input.device_read_many(self.fd) for event in events: yield InputEvent(*event) + # pylint: disable=no-self-argument def need_write(func): """ Decorator that raises :class:`EvdevError` if there is no write access to the @@ -82,6 +84,7 @@ def need_write(func): def wrapper(*args): fd = args[0].fd if fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_RDWR: + # pylint: disable=not-callable return func(*args) msg = 'no write access to device "%s"' % args[0].path raise EvdevError(msg) @@ -112,7 +115,7 @@ def write_event(self, event): self.write(event.type, event.code, event.value) @need_write - def write(self, etype, code, value): + def write(self, etype: int, code: int, value: int): """ Inject an input event into the input subsystem. Events are queued until a synchronization event is received. diff --git a/evdev/eventio_async.py b/src/evdev/eventio_async.py similarity index 86% rename from evdev/eventio_async.py rename to src/evdev/eventio_async.py index e89765e..4af1aab 100644 --- a/evdev/eventio_async.py +++ b/src/evdev/eventio_async.py @@ -1,10 +1,56 @@ import asyncio import select +import sys -from evdev import eventio +from . import eventio +from .events import InputEvent # needed for compatibility -from evdev.eventio import EvdevError +from .eventio import EvdevError + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing import Any as Self + + +class ReadIterator: + def __init__(self, device): + self.current_batch = iter(()) + self.device = device + + # Standard iterator protocol. + def __iter__(self) -> Self: + return self + + def __next__(self) -> InputEvent: + try: + # Read from the previous batch of events. + return next(self.current_batch) + except StopIteration: + r, w, x = select.select([self.device.fd], [], []) + self.current_batch = self.device.read() + return next(self.current_batch) + + def __aiter__(self) -> Self: + return self + + def __anext__(self) -> "asyncio.Future[InputEvent]": + future = asyncio.Future() + try: + # Read from the previous batch of events. + future.set_result(next(self.current_batch)) + except StopIteration: + + def next_batch_ready(batch): + try: + self.current_batch = batch.result() + future.set_result(next(self.current_batch)) + except Exception as e: + future.set_exception(e) + + self.device.async_read().add_done_callback(next_batch_ready) + return future class EventIO(eventio.EventIO): @@ -42,7 +88,7 @@ def async_read(self): self._do_when_readable(lambda: self._set_result(future, self.read)) return future - def async_read_loop(self): + def async_read_loop(self) -> ReadIterator: """ Return an iterator that yields input events. This iterator is compatible with the ``async for`` syntax. @@ -58,42 +104,3 @@ def close(self): # no event loop present, so there is nothing to # remove the reader from. Ignore pass - - -class ReadIterator: - def __init__(self, device): - self.current_batch = iter(()) - self.device = device - - # Standard iterator protocol. - def __iter__(self): - return self - - def __next__(self): - try: - # Read from the previous batch of events. - return next(self.current_batch) - except StopIteration: - r, w, x = select.select([self.device.fd], [], []) - self.current_batch = self.device.read() - return next(self.current_batch) - - def __aiter__(self): - return self - - def __anext__(self): - future = asyncio.Future() - try: - # Read from the previous batch of events. - future.set_result(next(self.current_batch)) - except StopIteration: - - def next_batch_ready(batch): - try: - self.current_batch = batch.result() - future.set_result(next(self.current_batch)) - except Exception as e: - future.set_exception(e) - - self.device.async_read().add_done_callback(next_batch_ready) - return future diff --git a/evdev/events.py b/src/evdev/events.py similarity index 76% rename from evdev/events.py rename to src/evdev/events.py index 104b563..922bfe6 100644 --- a/evdev/events.py +++ b/src/evdev/events.py @@ -37,7 +37,9 @@ # event type descriptions have been taken mot-a-mot from: # http://www.kernel.org/doc/Documentation/input/event-codes.txt -from evdev.ecodes import keys, KEY, SYN, REL, ABS, EV_KEY, EV_REL, EV_ABS, EV_SYN +# pylint: disable=no-name-in-module +from typing import Final +from .ecodes import ABS, EV_ABS, EV_KEY, EV_REL, EV_SYN, KEY, REL, SYN, keys class InputEvent: @@ -47,50 +49,50 @@ class InputEvent: def __init__(self, sec, usec, type, code, value): #: Time in seconds since epoch at which event occurred. - self.sec = sec + self.sec: int = sec #: Microsecond portion of the timestamp. - self.usec = usec + self.usec: int = usec #: Event type - one of ``ecodes.EV_*``. - self.type = type + self.type: int = type #: Event code related to the event type. - self.code = code + self.code: int = code #: Event value related to the event type. - self.value = value + self.value: int = value - def timestamp(self): + def timestamp(self) -> float: """Return event timestamp as a float.""" return self.sec + (self.usec / 1000000.0) - def __str__(s): + def __str__(self): msg = "event at {:f}, code {:02d}, type {:02d}, val {:02d}" - return msg.format(s.timestamp(), s.code, s.type, s.value) + return msg.format(self.timestamp(), self.code, self.type, self.value) - def __repr__(s): + def __repr__(self): msg = "{}({!r}, {!r}, {!r}, {!r}, {!r})" - return msg.format(s.__class__.__name__, s.sec, s.usec, s.type, s.code, s.value) + return msg.format(self.__class__.__name__, self.sec, self.usec, self.type, self.code, self.value) class KeyEvent: """An event generated by a keyboard, button or other key-like devices.""" - key_up = 0x0 - key_down = 0x1 - key_hold = 0x2 + key_up: Final[int] = 0x0 + key_down: Final[int] = 0x1 + key_hold: Final[int] = 0x2 __slots__ = "scancode", "keycode", "keystate", "event" - def __init__(self, event, allow_unknown=False): + def __init__(self, event: InputEvent, allow_unknown: bool = False): """ The ``allow_unknown`` argument determines what to do in the event of an event code for which a key code cannot be found. If ``False`` a ``KeyError`` will be raised. If ``True`` the keycode will be set to the hex value of the event code. """ - self.scancode = event.code + self.scancode: int = event.code if event.value == 0: self.keystate = KeyEvent.key_up @@ -108,7 +110,7 @@ def __init__(self, event, allow_unknown=False): raise #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): try: @@ -119,8 +121,8 @@ def __str__(self): msg = "key event at {:f}, {} ({}), {}" return msg.format(self.event.timestamp(), self.scancode, self.keycode, ks) - def __repr__(s): - return "{}({!r})".format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class RelEvent: @@ -128,16 +130,16 @@ class RelEvent: __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): msg = "relative axis event at {:f}, {}" return msg.format(self.event.timestamp(), REL[self.event.code]) - def __repr__(s): - return "{}({!r})".format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class AbsEvent: @@ -145,16 +147,16 @@ class AbsEvent: __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): msg = "absolute axis event at {:f}, {}" return msg.format(self.event.timestamp(), ABS[self.event.code]) - def __repr__(s): - return "{}({!r})".format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class SynEvent: @@ -165,16 +167,16 @@ class SynEvent: __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): msg = "synchronization event at {:f}, {}" return msg.format(self.event.timestamp(), SYN[self.event.code]) - def __repr__(s): - return "{}({!r})".format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) #: A mapping of event types to :class:`InputEvent` sub-classes. Used diff --git a/evdev/evtest.py b/src/evdev/evtest.py similarity index 93% rename from evdev/evtest.py rename to src/evdev/evtest.py index b61f093..6ea3bb5 100644 --- a/evdev/evtest.py +++ b/src/evdev/evtest.py @@ -16,20 +16,14 @@ evtest /dev/input/event0 /dev/input/event1 """ - +import atexit +import optparse import re -import sys import select -import atexit +import sys import termios -import optparse - -try: - input = raw_input -except NameError: - pass -from evdev import ecodes, list_devices, AbsInfo, InputDevice +from . import AbsInfo, InputDevice, ecodes, list_devices def parseopt(): @@ -154,9 +148,11 @@ def print_capabilities(device): def print_event(e): if e.type == ecodes.EV_SYN: if e.code == ecodes.SYN_MT_REPORT: - msg = "time {:<16} +++++++++ {} ++++++++" + msg = "time {:<17} +++++++++++++ {} +++++++++++++" + elif e.code == ecodes.SYN_DROPPED: + msg = "time {:<17} !!!!!!!!!!!!! {} !!!!!!!!!!!!!" else: - msg = "time {:<16} --------- {} --------" + msg = "time {:<17} ------------- {} -------------" print(msg.format(e.timestamp(), ecodes.SYN[e.code])) else: if e.type in ecodes.bytype: @@ -164,7 +160,7 @@ def print_event(e): else: codename = "?" - evfmt = "time {:<16} type {} ({}), code {:<4} ({}), value {}" + evfmt = "time {:<17} type {} ({}), code {:<4} ({}), value {}" print(evfmt.format(e.timestamp(), e.type, ecodes.EV[e.type], e.code, codename, e.value)) diff --git a/evdev/ff.py b/src/evdev/ff.py similarity index 99% rename from evdev/ff.py rename to src/evdev/ff.py index edb5ff2..260c362 100644 --- a/evdev/ff.py +++ b/src/evdev/ff.py @@ -1,6 +1,6 @@ import ctypes -from evdev import ecodes +from . import ecodes _u8 = ctypes.c_uint8 _u16 = ctypes.c_uint16 diff --git a/src/evdev/genecodes_c.py b/src/evdev/genecodes_c.py new file mode 100644 index 0000000..15a6693 --- /dev/null +++ b/src/evdev/genecodes_c.py @@ -0,0 +1,147 @@ +""" +Generate a Python extension module with the constants defined in linux/input.h. +""" + +import getopt +import os +import re +import sys + +# ----------------------------------------------------------------------------- +# The default header file locations to try. +headers = [ + "/usr/include/linux/input.h", + "/usr/include/linux/input-event-codes.h", + "/usr/include/linux/uinput.h", +] + +opts, args = getopt.getopt(sys.argv[1:], "", ["ecodes", "stubs", "reproducible"]) +if not opts: + print("usage: genecodes.py [--ecodes|--stubs] [--reproducible] ") + exit(2) + +if args: + headers = args + +reproducible = ("--reproducible", "") in opts + + +# ----------------------------------------------------------------------------- +macro_regex = r"#define\s+((?:KEY|ABS|REL|SW|MSC|LED|BTN|REP|SND|ID|EV|BUS|SYN|FF|UI_FF|INPUT_PROP)_\w+)" +macro_regex = re.compile(macro_regex) + +if reproducible: + uname = "hidden for reproducibility" +else: + # Uname without hostname. + uname = list(os.uname()) + uname = " ".join((uname[0], *uname[2:])) + + +# ----------------------------------------------------------------------------- +template_ecodes = r""" +#include +#ifdef __FreeBSD__ +#include +#include +#else +#include +#include +#endif + +/* Automatically generated by evdev.genecodes */ +/* Generated on %s */ +/* Generated from %s */ + +#define MODULE_NAME "_ecodes" +#define MODULE_HELP "linux/input.h macros" + +static PyMethodDef MethodTable[] = { + { NULL, NULL, 0, NULL} +}; + +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + MODULE_NAME, + MODULE_HELP, + -1, /* m_size */ + MethodTable, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; + +PyMODINIT_FUNC +PyInit__ecodes(void) +{ + PyObject* m = PyModule_Create(&moduledef); + if (m == NULL) return NULL; + +%s + + return m; +} +""" + + +template_stubs = r""" +# Automatically generated by evdev.genecodes +# Generated on %s +# Generated from %s + +# pylint: skip-file + +ecodes: dict[str, int] +keys: dict[int, str|list[str]] +bytype: dict[int, dict[int, str|list[str]]] + +KEY: dict[int, str|list[str]] +ABS: dict[int, str|list[str]] +REL: dict[int, str|list[str]] +SW: dict[int, str|list[str]] +MSC: dict[int, str|list[str]] +LED: dict[int, str|list[str]] +BTN: dict[int, str|list[str]] +REP: dict[int, str|list[str]] +SND: dict[int, str|list[str]] +ID: dict[int, str|list[str]] +EV: dict[int, str|list[str]] +BUS: dict[int, str|list[str]] +SYN: dict[int, str|list[str]] +FF_STATUS: dict[int, str|list[str]] +FF_INPUT_PROP: dict[int, str|list[str]] + +%s +""" + + +def parse_headers(headers=headers): + for header in headers: + try: + fh = open(header) + except (IOError, OSError): + continue + + for line in fh: + macro = macro_regex.search(line) + if macro: + yield macro.group(1) + + +all_macros = list(parse_headers()) +if not all_macros: + print("no input macros found in: %s" % " ".join(headers), file=sys.stderr) + sys.exit(1) + +# pylint: disable=possibly-used-before-assignment, used-before-assignment +if ("--ecodes", "") in opts: + body = (" PyModule_AddIntMacro(m, %s);" % macro for macro in all_macros) + template = template_ecodes +elif ("--stubs", "") in opts: + body = ("%s: int" % macro for macro in all_macros) + template = template_stubs + +body = os.linesep.join(body) +text = template % (uname, headers if not reproducible else ["hidden for reproducibility"], body) +print(text.strip()) diff --git a/src/evdev/genecodes_py.py b/src/evdev/genecodes_py.py new file mode 100644 index 0000000..f00020c --- /dev/null +++ b/src/evdev/genecodes_py.py @@ -0,0 +1,54 @@ +import sys +from unittest import mock +from pprint import PrettyPrinter + +sys.modules["evdev.ecodes"] = mock.Mock() +from evdev import ecodes_runtime as ecodes + +pprint = PrettyPrinter(indent=2, sort_dicts=True, width=120).pprint + + +print("# Automatically generated by evdev.genecodes_py") +print() +print('"""') +print(ecodes.__doc__.strip()) +print('"""') + +print() +print("from typing import Final, Dict, Tuple, Union") +print() + +for name, value in ecodes.ecodes.items(): + print(f"{name}: Final[int] = {value}") +print() + +entries = [ + ("ecodes", "Dict[str, int]", "#: Mapping of names to values."), + ("bytype", "Dict[int, Dict[int, Union[str, Tuple[str]]]]", "#: Mapping of event types to other value/name mappings."), + ("keys", "Dict[int, Union[str, Tuple[str]]]", "#: Keys are a combination of all BTN and KEY codes."), + ("KEY", "Dict[int, Union[str, Tuple[str]]]", None), + ("ABS", "Dict[int, Union[str, Tuple[str]]]", None), + ("REL", "Dict[int, Union[str, Tuple[str]]]", None), + ("SW", "Dict[int, Union[str, Tuple[str]]]", None), + ("MSC", "Dict[int, Union[str, Tuple[str]]]", None), + ("LED", "Dict[int, Union[str, Tuple[str]]]", None), + ("BTN", "Dict[int, Union[str, Tuple[str]]]", None), + ("REP", "Dict[int, Union[str, Tuple[str]]]", None), + ("SND", "Dict[int, Union[str, Tuple[str]]]", None), + ("ID", "Dict[int, Union[str, Tuple[str]]]", None), + ("EV", "Dict[int, Union[str, Tuple[str]]]", None), + ("BUS", "Dict[int, Union[str, Tuple[str]]]", None), + ("SYN", "Dict[int, Union[str, Tuple[str]]]", None), + ("FF", "Dict[int, Union[str, Tuple[str]]]", None), + ("UI_FF", "Dict[int, Union[str, Tuple[str]]]", None), + ("FF_STATUS", "Dict[int, Union[str, Tuple[str]]]", None), + ("INPUT_PROP", "Dict[int, Union[str, Tuple[str]]]", None) +] + +for key, annotation, doc in entries: + if doc: + print(doc) + + print(f"{key}: {annotation} = ", end="") + pprint(getattr(ecodes, key)) + print() \ No newline at end of file diff --git a/evdev/input.c b/src/evdev/input.c similarity index 88% rename from evdev/input.c rename to src/evdev/input.c index 0256745..894db22 100644 --- a/evdev/input.c +++ b/src/evdev/input.c @@ -46,12 +46,10 @@ int test_bit(const char* bitmask, int bit) { static PyObject * device_read(PyObject *self, PyObject *args) { - int fd; struct input_event event; // get device file descriptor (O_RDONLY|O_NONBLOCK) - if (PyArg_ParseTuple(args, "i", &fd) < 0) - return NULL; + int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0)); int n = read(fd, &event, sizeof(event)); @@ -65,15 +63,12 @@ device_read(PyObject *self, PyObject *args) return NULL; } - PyObject* sec = PyLong_FromLong(event.input_event_sec); - PyObject* usec = PyLong_FromLong(event.input_event_usec); - PyObject* val = PyLong_FromLong(event.value); - PyObject* py_input_event = NULL; - - py_input_event = Py_BuildValue("OOhhO", sec, usec, event.type, event.code, val); - Py_DECREF(sec); - Py_DECREF(usec); - Py_DECREF(val); + PyObject *py_input_event = PyTuple_New(5); + PyTuple_SET_ITEM(py_input_event, 0, PyLong_FromLong(event.input_event_sec)); + PyTuple_SET_ITEM(py_input_event, 1, PyLong_FromLong(event.input_event_usec)); + PyTuple_SET_ITEM(py_input_event, 2, PyLong_FromLong(event.type)); + PyTuple_SET_ITEM(py_input_event, 3, PyLong_FromLong(event.code)); + PyTuple_SET_ITEM(py_input_event, 4, PyLong_FromLong(event.value)); return py_input_event; } @@ -83,17 +78,8 @@ device_read(PyObject *self, PyObject *args) static PyObject * device_read_many(PyObject *self, PyObject *args) { - int fd; - // get device file descriptor (O_RDONLY|O_NONBLOCK) - int ret = PyArg_ParseTuple(args, "i", &fd); - if (!ret) return NULL; - - PyObject* event_list = PyList_New(0); - PyObject* py_input_event = NULL; - PyObject* sec = NULL; - PyObject* usec = NULL; - PyObject* val = NULL; + int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0)); struct input_event event[64]; @@ -102,26 +88,24 @@ device_read_many(PyObject *self, PyObject *args) if (nread < 0) { PyErr_SetFromErrno(PyExc_OSError); - Py_DECREF(event_list); return NULL; } - // Construct a list of event tuples, which we'll make sense of in Python - for (unsigned i = 0 ; i < nread/event_size ; i++) { - sec = PyLong_FromLong(event[i].input_event_sec); - usec = PyLong_FromLong(event[i].input_event_usec); - val = PyLong_FromLong(event[i].value); - - py_input_event = Py_BuildValue("OOhhO", sec, usec, event[i].type, event[i].code, val); - PyList_Append(event_list, py_input_event); - - Py_DECREF(py_input_event); - Py_DECREF(sec); - Py_DECREF(usec); - Py_DECREF(val); + // Construct a tuple of event tuples. Each tuple is the arguments to InputEvent. + size_t num_events = nread / event_size; + + PyObject* events = PyTuple_New(num_events); + for (size_t i = 0 ; i < num_events; i++) { + PyObject *py_input_event = PyTuple_New(5); + PyTuple_SET_ITEM(py_input_event, 0, PyLong_FromLong(event[i].input_event_sec)); + PyTuple_SET_ITEM(py_input_event, 1, PyLong_FromLong(event[i].input_event_usec)); + PyTuple_SET_ITEM(py_input_event, 2, PyLong_FromLong(event[i].type)); + PyTuple_SET_ITEM(py_input_event, 3, PyLong_FromLong(event[i].code)); + PyTuple_SET_ITEM(py_input_event, 4, PyLong_FromLong(event[i].value)); + PyTuple_SET_ITEM(events, i, py_input_event); } - return event_list; + return events; } @@ -208,6 +192,11 @@ ioctl_capabilities(PyObject *self, PyObject *args) return capabilities; on_err: + Py_XDECREF(capabilities); + Py_XDECREF(eventcodes); + Py_XDECREF(capability); + Py_XDECREF(py_absinfo); + Py_XDECREF(absitem); PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -309,7 +298,7 @@ static PyObject * ioctl_EVIOCGREP(PyObject *self, PyObject *args) { int fd, ret; - unsigned int rep[2] = {0}; + unsigned int rep[REP_CNT] = {0}; ret = PyArg_ParseTuple(args, "i", &fd); if (!ret) return NULL; @@ -317,7 +306,7 @@ ioctl_EVIOCGREP(PyObject *self, PyObject *args) if (ret == -1) return NULL; - return Py_BuildValue("(ii)", rep[0], rep[1]); + return Py_BuildValue("(ii)", rep[REP_DELAY], rep[REP_PERIOD]); } @@ -325,7 +314,7 @@ static PyObject * ioctl_EVIOCSREP(PyObject *self, PyObject *args) { int fd, ret; - unsigned int rep[2] = {0}; + unsigned int rep[REP_CNT] = {0}; ret = PyArg_ParseTuple(args, "iii", &fd, &rep[0], &rep[1]); if (!ret) return NULL; @@ -416,7 +405,9 @@ ioctl_EVIOCG_bits(PyObject *self, PyObject *args) PyObject* res = PyList_New(0); for (int i=0; i<=max; i++) { if (test_bit(bytes, i)) { - PyList_Append(res, Py_BuildValue("i", i)); + PyObject *val = PyLong_FromLong(i); + PyList_Append(res, val); + Py_DECREF(val); } } @@ -531,7 +522,9 @@ ioctl_EVIOCGPROP(PyObject *self, PyObject *args) PyObject* res = PyList_New(0); for (int i=0; i= 3 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, + "_input", + "Python bindings to certain linux input subsystem functions", -1, /* m_size */ MethodTable, /* m_methods */ NULL, /* m_reload */ @@ -590,19 +578,3 @@ PyInit__input(void) { return moduleinit(); } - -#else -static PyObject * -moduleinit(void) -{ - PyObject* m = Py_InitModule3(MODULE_NAME, MethodTable, MODULE_HELP); - if (m == NULL) return NULL; - return m; -} - -PyMODINIT_FUNC -init_input(void) -{ - moduleinit(); -} -#endif diff --git a/src/evdev/py.typed b/src/evdev/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/evdev/uinput.c b/src/evdev/uinput.c similarity index 95% rename from evdev/uinput.c rename to src/evdev/uinput.c index 3494705..8d2c096 100644 --- a/evdev/uinput.c +++ b/src/evdev/uinput.c @@ -356,8 +356,6 @@ int _uinput_end_erase(int fd, struct uinput_ff_erase *upload) return ioctl(fd, UI_END_FF_ERASE, upload); } -#define MODULE_NAME "_uinput" -#define MODULE_HELP "Python bindings for parts of linux/uinput.c" static PyMethodDef MethodTable[] = { { "open", uinput_open, METH_VARARGS, @@ -390,11 +388,10 @@ static PyMethodDef MethodTable[] = { { NULL, NULL, 0, NULL} }; -#if PY_MAJOR_VERSION >= 3 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, + "_uinput", + "Python bindings for parts of linux/uinput.c", -1, /* m_size */ MethodTable, /* m_methods */ NULL, /* m_reload */ @@ -418,21 +415,3 @@ PyInit__uinput(void) { return moduleinit(); } - -#else -static PyObject * -moduleinit(void) -{ - PyObject* m = Py_InitModule3(MODULE_NAME, MethodTable, MODULE_HELP); - if (m == NULL) return NULL; - - PyModule_AddIntConstant(m, "maxnamelen", UINPUT_MAX_NAME_SIZE); - return m; -} - -PyMODINIT_FUNC -init_uinput(void) -{ - moduleinit(); -} -#endif diff --git a/evdev/uinput.py b/src/evdev/uinput.py similarity index 85% rename from evdev/uinput.py rename to src/evdev/uinput.py index c4225d8..2c69c2b 100644 --- a/evdev/uinput.py +++ b/src/evdev/uinput.py @@ -1,15 +1,15 @@ +import ctypes import os import platform import re import stat import time from collections import defaultdict +from typing import Union, Tuple, Dict, Sequence, Optional -from evdev import _uinput -from evdev import ecodes, util, device -from evdev.events import InputEvent -import evdev.ff as ff -import ctypes +from . import _uinput, ecodes, ff, util +from .device import InputDevice, AbsInfo +from .events import InputEvent try: from evdev.eventio_async import EventIO @@ -40,7 +40,12 @@ class UInput(EventIO): ) @classmethod - def from_device(cls, *devices, filtered_types=(ecodes.EV_SYN, ecodes.EV_FF), **kwargs): + def from_device( + cls, + *devices: Union[InputDevice, Union[str, bytes, os.PathLike]], + filtered_types: Tuple[int] = (ecodes.EV_SYN, ecodes.EV_FF), + **kwargs, + ): """ Create an UInput device with the capabilities of one or more input devices. @@ -59,8 +64,8 @@ def from_device(cls, *devices, filtered_types=(ecodes.EV_SYN, ecodes.EV_FF), **k device_instances = [] for dev in devices: - if not isinstance(dev, device.InputDevice): - dev = device.InputDevice(str(dev)) + if not isinstance(dev, InputDevice): + dev = InputDevice(str(dev)) device_instances.append(dev) all_capabilities = defaultdict(set) @@ -81,14 +86,14 @@ def from_device(cls, *devices, filtered_types=(ecodes.EV_SYN, ecodes.EV_FF), **k def __init__( self, - events=None, - name="py-evdev-uinput", - vendor=0x1, - product=0x1, - version=0x1, - bustype=0x3, - devnode="/dev/uinput", - phys="py-evdev-uinput", + events: Optional[Dict[int, Sequence[int]]] = None, + name: str = "py-evdev-uinput", + vendor: int = 0x1, + product: int = 0x1, + version: int = 0x1, + bustype: int = 0x3, + devnode: str = "/dev/uinput", + phys: str = "py-evdev-uinput", input_props=None, # CentOS 7 has sufficiently old headers that FF_MAX_EFFECTS is not defined there, # which causes the whole module to fail loading. Fallback on a hardcoded value of @@ -133,13 +138,13 @@ def __init__( to inject only ``KEY_*`` and ``BTN_*`` event codes. """ - self.name = name #: Uinput device name. - self.vendor = vendor #: Device vendor identifier. - self.product = product #: Device product identifier. - self.version = version #: Device version identifier. - self.bustype = bustype #: Device bustype - e.g. ``BUS_USB``. - self.phys = phys #: Uinput device physical path. - self.devnode = devnode #: Uinput device node - e.g. ``/dev/uinput/``. + self.name: str = name #: Uinput device name. + self.vendor: int = vendor #: Device vendor identifier. + self.product: int = product #: Device product identifier. + self.version: int = version #: Device version identifier. + self.bustype: int = bustype #: Device bustype - e.g. ``BUS_USB``. + self.phys: str = phys #: Uinput device physical path. + self.devnode: str = devnode #: Uinput device node - e.g. ``/dev/uinput/``. if not events: events = {ecodes.EV_KEY: ecodes.keys.keys()} @@ -175,7 +180,7 @@ def __init__( #: An :class:`InputDevice ` instance #: for the fake input device. ``None`` if the device cannot be #: opened for reading and writing. - self.device = self._find_device(self.fd) + self.device: InputDevice = self._find_device(self.fd) def _prepare_events(self, events): """Prepare events for passing to _uinput.enable and _uinput.setup""" @@ -183,7 +188,7 @@ def _prepare_events(self, events): for etype, codes in events.items(): for code in codes: # Handle max, min, fuzz, flat. - if isinstance(code, (tuple, list, device.AbsInfo)): + if isinstance(code, (tuple, list, AbsInfo)): # Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0). f = [code[0]] f.extend(code[1]) @@ -208,7 +213,7 @@ def __repr__(self): return "{}({})".format(self.__class__.__name__, ", ".join(v)) def __str__(self): - msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\n' "event types: {}" + msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\nevent types: {}' evtypes = [i[0] for i in self.capabilities(True).keys()] msg = msg.format( @@ -227,7 +232,7 @@ def close(self): _uinput.close(self.fd) self.fd = -1 - def capabilities(self, verbose=False, absinfo=True): + def capabilities(self, verbose: bool = False, absinfo: bool = True): """See :func:`capabilities `.""" if self.device is None: raise UInputError("input device not opened - cannot read capabilities") @@ -268,13 +273,11 @@ def _verify(self): Verify that an uinput device exists and is readable and writable by the current process. """ - try: m = os.stat(self.devnode)[stat.ST_MODE] - if not stat.S_ISCHR(m): - raise - except (IndexError, OSError): - msg = '"{}" does not exist or is not a character device file ' "- verify that the uinput module is loaded" + assert stat.S_ISCHR(m) + except (IndexError, OSError, AssertionError): + msg = '"{}" does not exist or is not a character device file - verify that the uinput module is loaded' raise UInputError(msg.format(self.devnode)) if not os.access(self.devnode, os.W_OK): @@ -285,7 +288,7 @@ def _verify(self): msg = "uinput device name must not be longer than {} characters" raise UInputError(msg.format(_uinput.maxnamelen)) - def _find_device(self, fd): + def _find_device(self, fd: int) -> InputDevice: """ Tries to find the device node. Will delegate this task to one of several platform-specific functions. @@ -303,7 +306,7 @@ def _find_device(self, fd): # use the generic fallback method. return self._find_device_fallback() - def _find_device_linux(self, sysname): + def _find_device_linux(self, sysname: str) -> InputDevice: """ Tries to find the device node when running on Linux. """ @@ -331,15 +334,15 @@ def _find_device_linux(self, sysname): # device to show up or the permissions to be set. for attempt in range(19): try: - return device.InputDevice(device_path) + return InputDevice(device_path) except (FileNotFoundError, PermissionError): time.sleep(0.1) # Last attempt. If this fails, whatever exception the last attempt raises # shall be the exception that this function raises. - return device.InputDevice(device_path) + return InputDevice(device_path) - def _find_device_fallback(self): + def _find_device_fallback(self) -> Union[InputDevice, None]: """ Tries to find the device node when UI_GET_SYSNAME is not available or we're running on a system sufficiently exotic that we do not know how @@ -367,6 +370,6 @@ def _find_device_fallback(self): path_number_pairs.sort(key=lambda pair: pair[1], reverse=True) for path, _ in path_number_pairs: - d = device.InputDevice(path) + d = InputDevice(path) if d.name == self.name: return d diff --git a/evdev/util.py b/src/evdev/util.py similarity index 90% rename from evdev/util.py rename to src/evdev/util.py index 7209f4b..db89a22 100644 --- a/evdev/util.py +++ b/src/evdev/util.py @@ -1,23 +1,22 @@ -import re +import collections +import glob import os +import re import stat -import glob -import collections +from typing import Union, List -from evdev import ecodes -from evdev.events import event_factory +from . import ecodes +from .events import InputEvent, event_factory, KeyEvent, RelEvent, AbsEvent, SynEvent -def list_devices(input_device_dir="/dev/input"): +def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]: """List readable character devices in ``input_device_dir``.""" fns = glob.glob("{}/event*".format(input_device_dir)) - fns = list(filter(is_device, fns)) - - return fns + return list(filter(is_device, fns)) -def is_device(fn): +def is_device(fn: Union[str, bytes, os.PathLike]) -> bool: """Check if ``fn`` is a readable and writable character device.""" if not os.path.exists(fn): @@ -33,7 +32,7 @@ def is_device(fn): return True -def categorize(event): +def categorize(event: InputEvent) -> Union[InputEvent, KeyEvent, RelEvent, AbsEvent, SynEvent]: """ Categorize an event according to its type. diff --git a/tests/test_ecodes.py b/tests/test_ecodes.py index c810b4f..5c3e38d 100644 --- a/tests/test_ecodes.py +++ b/tests/test_ecodes.py @@ -1,9 +1,8 @@ -# encoding: utf-8 - from evdev import ecodes +from evdev import ecodes_runtime -prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF" +prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF UI_FF" def to_tuples(val): @@ -29,3 +28,14 @@ def test_overlap(): vals_ff = set(to_tuples(ecodes.FF.values())) vals_ff_status = set(to_tuples(ecodes.FF_STATUS.values())) assert bool(vals_ff & vals_ff_status) is False + + +def test_generated(): + e_run = vars(ecodes_runtime) + e_gen = vars(ecodes) + + def keys(v): + res = {k for k in v.keys() if not k.startswith("_") and not k[1].islower()} + return res + + assert keys(e_run) == keys(e_gen) \ No newline at end of file diff --git a/tests/test_uinput.py b/tests/test_uinput.py index 2bf3dc1..666361f 100644 --- a/tests/test_uinput.py +++ b/tests/test_uinput.py @@ -1,10 +1,13 @@ # encoding: utf-8 - +import os +import stat from select import select -from pytest import raises, fixture +from unittest.mock import patch -from evdev import uinput, ecodes, events, device, util +import pytest +from pytest import raises, fixture +from evdev import uinput, ecodes, device, UInputError # ----------------------------------------------------------------------------- uinput_options = { @@ -66,12 +69,12 @@ def test_enable_events(c): def test_abs_values(c): e = ecodes - c["events"] = { + c = { e.EV_KEY: [e.KEY_A, e.KEY_B], - e.EV_ABS: [(e.ABS_X, (0, 255, 0, 0)), (e.ABS_Y, device.AbsInfo(0, 255, 5, 10, 0, 0))], + e.EV_ABS: [(e.ABS_X, (0, 0, 255, 0, 0)), (e.ABS_Y, device.AbsInfo(0, 0, 255, 5, 10, 0))], } - with uinput.UInput(**c) as ui: + with uinput.UInput(events=c) as ui: c = ui.capabilities() abs = device.AbsInfo(value=0, min=0, max=255, fuzz=0, flat=0, resolution=0) assert c[e.EV_ABS][0] == (0, abs) @@ -114,3 +117,21 @@ def test_write(c): assert evs[3].code == ecodes.KEY_A and evs[3].value == 2 assert evs[4].code == ecodes.KEY_A and evs[4].value == 0 break + + +@patch.object(stat, 'S_ISCHR', return_value=False) +def test_not_a_character_device(ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) + +@patch.object(stat, 'S_ISCHR', return_value=True) +@patch.object(os, 'stat', side_effect=OSError()) +def test_not_a_character_device_2(stat_mock, ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) + +@patch.object(stat, 'S_ISCHR', return_value=True) +@patch.object(os, 'stat', return_value=[]) +def test_not_a_character_device_3(stat_mock, ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) diff --git a/tests/test_util.py b/tests/test_util.py index 5a979df..7112927 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -6,7 +6,7 @@ def test_match_ecodes_a(): assert res == {1: [372, 418, 419, 420]} assert dict(util.resolve_ecodes_dict(res)) == { ("EV_KEY", 1): [ - (["KEY_FULL_SCREEN", "KEY_ZOOM"], 372), + (("KEY_FULL_SCREEN", "KEY_ZOOM"), 372), ("KEY_ZOOMIN", 418), ("KEY_ZOOMOUT", 419), ("KEY_ZOOMRESET", 420),