diff --git a/.github/workflows/install.yaml b/.github/workflows/install.yaml new file mode 100644 index 0000000..e879179 --- /dev/null +++ b/.github/workflows/install.yaml @@ -0,0 +1,29 @@ +name: Test install + +on: + - push + - pull_request + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + include: + - os: ubuntu-latest + python-version: "3.9" + + steps: + - uses: actions/checkout@v6 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python-evdev + run: | + python -m pip install -v . + (cd /tmp && python -c "import evdev.ecodes; print(evdev.ecodes)") diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..20d254b --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,27 @@ +name: Lint + +on: + - push + - pull_request + +jobs: + pylint: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.14"] + + steps: + - uses: actions/checkout@v6 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Check for pylint errors + run: | + python -m pip install pylint setuptools + python setup.py build + python -m pylint --verbose -E build/lib*/evdev diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..073d524 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: Test + +on: + - push + - pull_request + +jobs: + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.14"] + + steps: + - uses: actions/checkout@v6 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Run pytest tests + # sudo required to write to uinputs + run: | + sudo python -m pip install pytest setuptools + sudo python -m pip install -e . + sudo python -m pytest tests diff --git a/.gitignore b/.gitignore index 8fc000b..70ac303 100644 --- a/.gitignore +++ b/.gitignore @@ -5,20 +5,24 @@ develop-eggs/ dist/ build/ +wheelhouse/ dropin.cache pip-log.txt .installed.cfg .coverage tags TAGS -evdev/*.so -evdev/ecodes.c -evdev/iprops.c -docs/_build .#* __pycache__ .pytest_cache +.ruff_cache +.venv +uv.lock -evdev/_ecodes.py -evdev/_input.py -evdev/_uinput.py +src/evdev/*.so +src/evdev/ecodes.c +src/evdev/ecodes.pyi +docs/_build +src/evdev/_ecodes.py +src/evdev/_input.py +src/evdev/_uinput.py diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..4a50dff --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,15 @@ +# https://docs.readthedocs.io/en/stable/config-file/v2.html +version: 2 + +build: + os: ubuntu-22.04 + tools: + python: "3.12" + +sphinx: + configuration: docs/conf.py + +python: + install: + - requirements: requirements-dev.txt + - path: . \ No newline at end of file diff --git a/LICENSE b/LICENSE index ce3a1f7..8482b07 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2012-2016 Georgi Valkov. All rights reserved. +Copyright (c) 2012-2025 Georgi Valkov. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are diff --git a/MANIFEST.in b/MANIFEST.in index 7066730..be2be3d 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,7 +1,5 @@ -include README.rst -include LICENSE - # The _ecodes extension module source file needs to be generated against the # evdev headers of the running kernel. Refer to the 'build_ecodes' distutils # command in setup.py. -exclude evdev/ecodes.c +exclude src/evdev/ecodes.c +include src/evdev/ecodes.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..9746040 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# evdev + +

+ pypi version + License + Packaging status +

+ +This package provides bindings to the generic input event interface in Linux. +The *evdev* interface serves the purpose of passing events generated in the +kernel directly to userspace through character devices that are typically +located in `/dev/input/`. + +This package also comes with bindings to *uinput*, the userspace input +subsystem. *Uinput* allows userspace programs to create and handle input devices +that can inject events directly into the input subsystem. + +***Documentation:*** +https://python-evdev.readthedocs.io/en/latest/ + +***Development:*** +https://github.com/gvalkov/python-evdev + +***Package:*** +https://pypi.python.org/pypi/evdev + +***Changelog:*** +https://python-evdev.readthedocs.io/en/latest/changelog.html \ No newline at end of file diff --git a/README.rst b/README.rst deleted file mode 100644 index 207bc39..0000000 --- a/README.rst +++ /dev/null @@ -1,24 +0,0 @@ -*evdev* -------- - -This package provides bindings to the generic input event interface in -Linux. The *evdev* interface serves the purpose of passing events -generated in the kernel directly to userspace through character -devices that are typically located in ``/dev/input/``. - -This package also comes with bindings to *uinput*, the userspace input -subsystem. *Uinput* allows userspace programs to create and handle -input devices that can inject events directly into the input -subsystem. - -Documentation: - http://python-evdev.readthedocs.io/en/latest/ - -Development: - https://github.com/gvalkov/python-evdev - -Package: - http://pypi.python.org/pypi/evdev - -Changelog: - http://python-evdev.readthedocs.io/en/latest/changelog.html diff --git a/docs/_static/evdev-logo-small.png b/docs/_static/evdev-logo-small.png deleted file mode 100644 index 95c6491..0000000 Binary files a/docs/_static/evdev-logo-small.png and /dev/null differ diff --git a/docs/_static/evdev-logo.png b/docs/_static/evdev-logo.png deleted file mode 100644 index 168a90a..0000000 Binary files a/docs/_static/evdev-logo.png and /dev/null differ diff --git a/docs/_static/github-logo.png b/docs/_static/github-logo.png deleted file mode 100644 index e03d8dd..0000000 Binary files a/docs/_static/github-logo.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-archlinux.png b/docs/_static/pacifica-icon-set/distributor-logo-archlinux.png deleted file mode 100644 index e1e7b54..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-archlinux.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-debian.png b/docs/_static/pacifica-icon-set/distributor-logo-debian.png deleted file mode 100644 index 42ae247..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-debian.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-fedora.png b/docs/_static/pacifica-icon-set/distributor-logo-fedora.png deleted file mode 100644 index 74bc18f..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-fedora.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-linux-mint.png b/docs/_static/pacifica-icon-set/distributor-logo-linux-mint.png deleted file mode 100644 index 2f822dd..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-linux-mint.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-opensuse.png b/docs/_static/pacifica-icon-set/distributor-logo-opensuse.png deleted file mode 100644 index cc34ddd..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-opensuse.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-raspbian.png b/docs/_static/pacifica-icon-set/distributor-logo-raspbian.png deleted file mode 100644 index b55ce5b..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-raspbian.png and /dev/null differ diff --git a/docs/_static/pacifica-icon-set/distributor-logo-ubuntu.png b/docs/_static/pacifica-icon-set/distributor-logo-ubuntu.png deleted file mode 100644 index fc15ab6..0000000 Binary files a/docs/_static/pacifica-icon-set/distributor-logo-ubuntu.png and /dev/null differ diff --git a/docs/changelog.rst b/docs/changelog.rst index e7b3af4..bcf1636 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,14 +1,106 @@ Changelog --------- +1.9.3 (Feb 05, 2025) +==================== + +- Fix several memory leaks in ``input.c``. + +- Raise the minimum supported Python version to 3.9 and the setuptools version to 77.0. + + +1.9.2 (May 01, 2025) +==================== + +- Add the "--reproducible" build option which removes the build date and used headers from the + generated ``ecodes.c``. Example usage:: + + python -m build --config-setting=--build-option='build_ecodes --reproducible' -n + +- Use ``Generic`` to set precise type for ``InputDevice.path``. + + +1.9.1 (Feb 22, 2025) +==================== + +- Fix fox missing ``UI_FF`` constants in generated ``ecodes.py``. + +- More type annotations. + + +1.9.0 (Feb 08, 2025) +==================== + +- Fix for ``CPATH/C_INCLUDE_PATH`` being ignored during build. + +- Slightly faster reading of events in ``device.read()`` and ``device.read_one()``. + +- Fix FreeBSD support. + +- Drop deprecated ``InputDevice.fn`` (use ``InputDevice.path`` instead). + +- Improve type hint coverage and add a ``py.typed`` file to the sdist. + + +1.8.0 (Jan 25, 2025) +==================== + +- Binary wheels are now provided by the `evdev-binary `_ package. + The package is compiled on manylinux_2_28 against kernel 4.18. + +- The ``evdev.ecodes`` module is now generated at install time and contains only constants. This allows type + checking and introspection of the ``evdev.ecodes`` module, without having to execute it first. The old + module is available as ``evdev.ecodes_runtime``. In case generation of the static ``ecodes.py`` fails, the + install process falls back to using ``ecodes_runtime.py`` as ``ecodes.py``. + +- Reverse mappings in ``evdev.ecodes`` that point to more than one value are now tuples instead of lists. For example:: + + >>> ecodes.KEY[153] + ('KEY_DIRECTION', 'KEY_ROTATE_DISPLAY') + +- Raise the minimum supported Python version to 3.8. + +- Fix keyboard delay and repeat being swapped (#227). + +- Move the ``syn()`` convenience method from ``InputDevice`` to ``EventIO`` (#224). + + +1.7.1 (May 8, 2024) +==================== + +- Provide fallback value for ``FF_MAX_EFFECTS``, which fixes the build on EL 7 (#219). + +- Add ``#ifdef`` guards around ``UI_GET_SYSNAME`` to improve kernel compatibility (#218) . + +- Wait up to two seconds for uinput devices to appear. (#215) + + +1.7.0 (Feb 18, 2024) +==================== + +- Respect the ``CPATH/C_INCLUDE_PATH`` environment variables during install. + +- Add the uniq address to the string representation of ``InputDevice``. + +- Improved method for finding the device node corresponding to a uinput device (`#206 `_). + +- Repository TLC (reformatted with ruff, fixed linting warnings, moved packaging metadata to ``pyproject.toml`` etc.). + + +1.6.1 (Jan 20, 2023) +==================== + +- Fix generation of ``ecodes.c`` when the path to ``sys.executable`` contains spaces. + + 1.6.0 (Jul 17, 2022) -================== +==================== -- Fix Python 3.11 compatibility (`#174 `_) +- Fix Python 3.11 compatibility (`#174 `_). 1.5.0 (Mar 24, 2022) -================== +==================== - Fix documentation (`#163 `_, `#160 `_). diff --git a/docs/conf.py b/docs/conf.py index 8dc3ae9..0be06b3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,10 +1,9 @@ -# -*- coding: utf-8 -*- - -import sys, os +import os +import sys import sphinx_rtd_theme # Check if readthedocs is building us -on_rtd = os.environ.get('READTHEDOCS', None) == 'True' +on_rtd = os.environ.get("READTHEDOCS", None) == "True" # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -12,93 +11,98 @@ # Trick autodoc into running without having built the extension modules. if on_rtd: - with open('../evdev/_ecodes.py', 'w') as fh: - fh.write(''' + with open("../src/evdev/_ecodes.py", "w") as fh: + fh.write( + """ KEY = ABS = REL = SW = MSC = LED = REP = SND = SYN = FF = FF_STATUS = BTN_A = KEY_A = 1 EV_KEY = EV_ABS = EV_REL = EV_SW = EV_MSC = EV_LED = EV_REP = 1 EV_SND = EV_SYN = EV_FF = EV_FF_STATUS = FF_STATUS = 1 -KEY_MAX, KEY_CNT = 1, 2''') +KEY_MAX, KEY_CNT = 1, 2""" + ) - with open('../evdev/_input.py', 'w'): pass - with open('../evdev/_uinput.py', 'w'): pass + with open("../src/evdev/_input.py", "w"): + pass + with open("../src/evdev/_uinput.py", "w"): + pass # -- General configuration ----------------------------------------------------- # If your documentation needs a minimal Sphinx version, state it here. -#needs_sphinx = '1.0' +# needs_sphinx = '1.0' # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.viewcode', - 'sphinx.ext.intersphinx', - 'sphinx.ext.napoleon', - 'sphinx_copybutton', + "sphinx.ext.autodoc", + "sphinx.ext.viewcode", + "sphinx.ext.intersphinx", + "sphinx.ext.napoleon", + "sphinx_rtd_theme", + "sphinx_copybutton", ] -autodoc_member_order = 'bysource' +autodoc_member_order = "bysource" # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. -#source_encoding = 'utf-8-sig' +# source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = u'python-evdev' -copyright = u'2012-2022, Georgi Valkov' +project = "python-evdev" +copyright = "2012-2025, Georgi Valkov and contributors" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The full version, including alpha/beta/rc tags. -release = '1.6.0' +release = "1.9.3" # The short X.Y version. version = release # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. -#language = None +# language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: -#today = '' +# today = '' # Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' +# today_fmt = '%B %d, %Y' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ["_build"] # The reST default role (used for this markup: `text`) to use for all documents. -#default_role = None +# default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +# add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). -#add_module_names = True +# add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. -#show_authors = False +# show_authors = False # The name of the Pygments (syntax highlighting) style to use. -#pygments_style = 'sphinx' +# pygments_style = 'sphinx' # A list of ignored prefixes for module index sorting. -#modindex_common_prefix = [] +# modindex_common_prefix = [] # -- Options for HTML output --------------------------------------------------- @@ -106,10 +110,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -if not on_rtd: - import sphinx_rtd_theme - html_theme = 'sphinx_rtd_theme' - html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +html_theme = "sphinx_rtd_theme" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -120,122 +121,116 @@ # The name for this set of Sphinx documents. If None, it defaults to # " v documentation". -html_title = 'Python-evdev' +html_title = "Python-evdev" # A shorter title for the navigation bar. Default is the same as html_title. -html_short_title = 'evdev' +html_short_title = "evdev" # The name of an image file (relative to this directory) to place at the top # of the sidebar. -#html_logo = '_static/evdev-logo-small.png' +# html_logo = '' # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. -#html_favicon = None +# html_favicon = None # Add any paths that contain custom static files (such as style sheets here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' +# html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. -#html_use_smartypants = True +# html_use_smartypants = True # Custom sidebar templates, maps document names to template names. -#html_sidebars = {} +# html_sidebars = {} # Additional templates that should be rendered to pages, maps page names to # template names. -#html_additional_pages = {} +# html_additional_pages = {} # If false, no module index is generated. -#html_domain_indices = True +# html_domain_indices = True # If false, no index is generated. -#html_use_index = True +# html_use_index = True # If true, the index is split into individual pages for each letter. -#html_split_index = False +# html_split_index = False # If true, links to the reST sources are added to the pages. html_show_sourcelink = True # If true, "Created using Sphinx" is shown in the HTML footer. Default is True. -#html_show_sphinx = True +# html_show_sphinx = True # If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. -#html_show_copyright = True +# html_show_copyright = True # If true, an OpenSearch description file will be output, and all pages will # contain a tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. -#html_use_opensearch = '' +# html_use_opensearch = '' # This is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = None +# html_file_suffix = None # Output file base name for HTML help builder. -htmlhelp_basename = 'python-evdev-doc' +htmlhelp_basename = "python-evdev-doc" # -- Options for LaTeX output -------------------------------------------------- latex_elements = { -# The paper size ('letterpaper' or 'a4paper'). -#'papersize': 'letterpaper', - -# The font size ('10pt', '11pt' or '12pt'). -#'pointsize': '10pt', - -# Additional stuff for the LaTeX preamble. -#'preamble': '', + # The paper size ('letterpaper' or 'a4paper'). + #'papersize': 'letterpaper', + # The font size ('10pt', '11pt' or '12pt'). + #'pointsize': '10pt', + # Additional stuff for the LaTeX preamble. + #'preamble': '', } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'python-evdev.tex', u'evdev documentation', - u'Georgi Valkov', 'manual'), + ("index", "python-evdev.tex", "evdev documentation", "Georgi Valkov", "manual"), ] # The name of an image file (relative to this directory) to place at the top of # the title page. -#latex_logo = None +# latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. -#latex_use_parts = False +# latex_use_parts = False # If true, show page references after internal links. -#latex_show_pagerefs = False +# latex_show_pagerefs = False # If true, show URL addresses after external links. -#latex_show_urls = False +# latex_show_urls = False # Documents to append as an appendix to all manuals. -#latex_appendices = [] +# latex_appendices = [] # If false, no module index is generated. -#latex_domain_indices = True +# latex_domain_indices = True # -- Options for manual page output -------------------------------------------- # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - ('index', 'python-evdev', u'python-evdev Documentation', - [u'Georgi Valkov'], 1) -] +man_pages = [("index", "python-evdev", "python-evdev Documentation", ["Georgi Valkov"], 1)] # If true, show URL addresses after external links. -#man_show_urls = False +# man_show_urls = False # -- Options for Texinfo output ------------------------------------------------ @@ -244,23 +239,29 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'python-evdev', u'python-evdev Documentation', - u'Georgi Valkov', 'evdev', 'Bindings for the linux input handling subsystem.', - 'Miscellaneous'), + ( + "index", + "python-evdev", + "python-evdev Documentation", + "Georgi Valkov", + "evdev", + "Bindings for the linux input handling subsystem.", + "Miscellaneous", + ), ] -intersphinx_mapping = {'python': ('http://docs.python.org/3', None)} +intersphinx_mapping = {"python": ("http://docs.python.org/3", None)} # Documents to append as an appendix to all manuals. -#texinfo_appendices = [] +# texinfo_appendices = [] # If false, no module index is generated. -#texinfo_domain_indices = True +# texinfo_domain_indices = True # How to display URL addresses: 'footnote', 'no', or 'inline'. -#texinfo_show_urls = 'footnote' +# texinfo_show_urls = 'footnote' # Copybutton config -#copybutton_prompt_text = r">>> " -#copybutton_prompt_is_regexp = True -#copybutton_only_copy_prompt_lines = True +# copybutton_prompt_text = r">>> " +# copybutton_prompt_is_regexp = True +# copybutton_only_copy_prompt_lines = True diff --git a/docs/install.rst b/docs/install.rst index 6055f80..f93e0b8 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -1,31 +1,13 @@ -From a binary package -===================== - -Python-evdev has been packaged for the following GNU/Linux distributions: +From an OS package +================== +Python-evdev has been packaged for the following distributions: .. raw:: html - Consult the documentation of your OS package manager for installation instructions. @@ -34,23 +16,23 @@ From source =========== The latest stable version of *python-evdev* can be installed from pypi_, -provided that you have gcc/clang, pip_ and the Python and Linux development -headers installed on your system. Installing them is distribution specific and -typically falls in one of the following categories: +provided that you have a compiler, pip_ and the Python and Linux development +headers installed on your system. Installing these is distribution specific and +typically falls into one of the following: On a Debian compatible OS: .. code-block:: bash - $ apt-get install python-dev python-pip gcc - $ apt-get install linux-headers-$(uname -r) + $ apt install python-dev python-pip gcc + $ apt install linux-headers-$(uname -r) On a Redhat compatible OS: .. code-block:: bash - $ yum install python-devel python-pip gcc - $ yum install kernel-headers-$(uname -r) + $ dnf install python-devel python-pip gcc + $ dnf install kernel-headers-$(uname -r) On Arch Linux and derivatives: @@ -58,22 +40,31 @@ On Arch Linux and derivatives: $ pacman -S core/linux-api-headers python-pip gcc -Once all dependencies are available, you may install *python-evdev* using pip_: +Once all OS dependencies are available, you may install *python-evdev* using +pip_, preferably in a [virtualenv]_: .. code-block:: bash - $ sudo pip install evdev # available globally - $ pip install --user evdev # available to the current user + # Install globally (not recommended). + $ sudo python3 -m pip install evdev + + # Install for the current user. + $ python3 -m pip install --user evdev + + # Install in a virtual environment. + $ python3 -m venv abc + $ source abc/bin/activate + $ python3 -m pip install evdev Specifying header locations -=========================== +--------------------------- By default, the setup script will look for the ``input.h`` and ``input-event-codes.h`` [#f1]_ header files ``/usr/include/linux``. You may use the ``--evdev-headers`` option to the ``build_ext`` setuptools -command to specify the location of these header files. It accepts one or more +command to the location of these header files. It accepts one or more colon-separated paths. For example: .. code-block:: bash @@ -83,11 +74,27 @@ colon-separated paths. For example: --include-dirs buildroot/ \ install # or any other command (e.g. develop, bdist, bdist_wheel) -.. [#f1] ``input-event-codes.h`` is found only in more recent kernel versions. + +From a binary package +===================== + +You may choose to install a precompiled version of *python-evdev* from pypi. The +`evdev-binary`_ package provides binary wheels that have been compiled on EL8 +against the 4.18.0 kernel headers. + +.. code-block:: bash + + $ python3 -m pip install evdev-binary + +While the evdev interface is stable, the precompiled version may not be fully +compatible or expose all the features of your running kernel. For best results, +it is recommended to use an OS package or to install from source. +.. [#f1] ``input-event-codes.h`` is found only in recent kernel versions. .. _pypi: http://pypi.python.org/pypi/evdev +.. _evdev-binary: http://pypi.python.org/pypi/evdev-binary .. _github: https://github.com/gvalkov/python-evdev .. _pip: http://pip.readthedocs.org/en/latest/installing.html .. _example: https://github.com/gvalkov/python-evdev/tree/master/examples -.. _`async/await`: https://docs.python.org/3/library/asyncio-task.html +.. _virtualenv: https://docs.python.org/3/library/venv.html diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 7416181..04ae42f 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -445,11 +445,64 @@ Injecting an FF-event into first FF-capable device found ecodes.FF_RUMBLE, -1, 0, ff.Trigger(0, 0), ff.Replay(duration_ms, 0), - ff.EffectType(ff_rumble_effect=rumble) + effect_type ) repeat_count = 1 effect_id = dev.upload_effect(effect) dev.write(ecodes.EV_FF, effect_id, repeat_count) - time.sleep(duration_ms) + time.sleep(duration_ms / 1000) dev.erase_effect(effect_id) + + +Forwarding force-feedback from uinput to a real device +====================================================== + +:: + + import evdev + from evdev import ecodes as e + + # Find first EV_FF capable event device (that we have permissions to use). + for name in evdev.list_devices(): + dev = evdev.InputDevice(name) + if e.EV_FF in dev.capabilities(): + break + # To ensure forwarding works correctly it is important that `max_effects` + # of the uinput device is <= `dev.ff_effects_count`. + # `from_device()` will do this automatically, but in some situations you may + # want to set the `max_effects` parameter manually, such as when using `Uinput()`. + # `filtered_types` is specified as by default EV_FF events are filtered + uinput = evdev.UInput.from_device(dev, filtered_types=[e.EV_SYN]) + + # Keeps track of which effects have been uploaded to the device + effects = set() + + for event in uinput.read_loop(): + + # Handle the special uinput events + if event.type == e.EV_UINPUT: + + if event.code == e.UI_FF_UPLOAD: + upload = uinput.begin_upload(event.value) + + # Checks if this is a new effect + if upload.effect.id not in effects: + effects.add(upload.effect.id) + # Setting id to 1 indicates that a new effect must be allocated + upload.effect.id = -1 + + dev.upload_effect(upload.effect) + upload.retval = 0 + uinput.end_upload(upload) + + elif event.code == e.UI_FF_ERASE: + erase = uinput.begin_erase(event.value) + erase.retval = 0 + dev.erase_effect(erase.effect_id) + effects.remove(erase.effect_id) + uinput.end_erase(erase) + + # Forward writes to actual rumble device. + elif event.type == e.EV_FF: + dev.write(event.type, event.code, event.value) diff --git a/evdev/__init__.py b/evdev/__init__.py deleted file mode 100644 index 797623b..0000000 --- a/evdev/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -#-------------------------------------------------------------------------- -# Gather everything into a single, convenient namespace. -#-------------------------------------------------------------------------- - -from evdev.device import DeviceInfo, InputDevice, AbsInfo, EvdevError -from evdev.events import InputEvent, KeyEvent, RelEvent, SynEvent, AbsEvent, event_factory -from evdev.uinput import UInput, UInputError -from evdev.util import list_devices, categorize, resolve_ecodes, resolve_ecodes_dict -from evdev import ecodes -from evdev import ff diff --git a/evdev/genecodes.py b/evdev/genecodes.py deleted file mode 100644 index 04c10e1..0000000 --- a/evdev/genecodes.py +++ /dev/null @@ -1,116 +0,0 @@ -# -*- coding: utf-8; -*- - -''' -Generate a Python extension module with the constants defined in linux/input.h. -''' - -from __future__ import print_function -import os, sys, re - - -#----------------------------------------------------------------------------- -# The default header file locations to try. -headers = [ - '/usr/include/linux/input.h', - '/usr/include/linux/input-event-codes.h', -] - -if sys.argv[1:]: - headers = sys.argv[1:] - - -#----------------------------------------------------------------------------- -macro_regex = r'#define +((?:KEY|ABS|REL|SW|MSC|LED|BTN|REP|SND|ID|EV|BUS|SYN|FF|UI_FF|INPUT_PROP)_\w+)' -macro_regex = re.compile(macro_regex) - -uname = list(os.uname()); del uname[1] -uname = ' '.join(uname) - - -#----------------------------------------------------------------------------- -template = r''' -#include -#ifdef __FreeBSD__ -#include -#else -#include -#include -#endif - -/* Automatically generated by evdev.genecodes */ -/* Generated on %s */ - -#define MODULE_NAME "_ecodes" -#define MODULE_HELP "linux/input.h macros" - -static PyMethodDef MethodTable[] = { - { NULL, NULL, 0, NULL} -}; - -#if PY_MAJOR_VERSION >= 3 -static struct PyModuleDef moduledef = { - PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, - -1, /* m_size */ - MethodTable, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ -}; -#endif - -static PyObject * -moduleinit(void) -{ - -#if PY_MAJOR_VERSION >= 3 - PyObject* m = PyModule_Create(&moduledef); -#else - PyObject* m = Py_InitModule3(MODULE_NAME, MethodTable, MODULE_HELP); -#endif - - if (m == NULL) return NULL; - -%s - - return m; -} - -#if PY_MAJOR_VERSION >= 3 -PyMODINIT_FUNC -PyInit__ecodes(void) -{ - return moduleinit(); -} -#else -PyMODINIT_FUNC -init_ecodes(void) -{ - moduleinit(); -} -#endif -''' - -def parse_header(header): - for line in open(header): - macro = macro_regex.search(line) - if macro: - yield ' PyModule_AddIntMacro(m, %s);' % macro.group(1) - -all_macros = [] -for header in headers: - try: - fh = open(header) - except (IOError, OSError): - continue - all_macros += parse_header(header) - -if not all_macros: - print('no input macros found in: %s' % ' '.join(headers), file=sys.stderr) - sys.exit(1) - - -macros = os.linesep.join(all_macros) -print(template % (uname, macros)) diff --git a/evdev/uinput.py b/evdev/uinput.py deleted file mode 100644 index 4f6148a..0000000 --- a/evdev/uinput.py +++ /dev/null @@ -1,284 +0,0 @@ -# encoding: utf-8 - -import os -import stat -import time -from collections import defaultdict - -from evdev import _uinput -from evdev import ecodes, util, device -from evdev.events import InputEvent -import evdev.ff as ff -import ctypes - -try: - from evdev.eventio_async import EventIO -except ImportError: - from evdev.eventio import EventIO - - - -class UInputError(Exception): - pass - - -class UInput(EventIO): - ''' - A userland input device and that can inject input events into the - linux input subsystem. - ''' - - __slots__ = ( - 'name', 'vendor', 'product', 'version', 'bustype', - 'events', 'devnode', 'fd', 'device', - ) - - @classmethod - def from_device(cls, *devices, filtered_types=(ecodes.EV_SYN, ecodes.EV_FF), **kwargs): - ''' - Create an UInput device with the capabilities of one or more input - devices. - - Arguments - --------- - devices : InputDevice|str - Varargs of InputDevice instances or paths to input devices. - - filtered_types : Tuple[event type codes] - Event types to exclude from the capabilities of the uinput device. - - **kwargs - Keyword arguments to UInput constructor (i.e. name, vendor etc.). - ''' - - device_instances = [] - for dev in devices: - if not isinstance(dev, device.InputDevice): - dev = device.InputDevice(str(dev)) - device_instances.append(dev) - - all_capabilities = defaultdict(set) - - # Merge the capabilities of all devices into one dictionary. - for dev in device_instances: - for ev_type, ev_codes in dev.capabilities().items(): - all_capabilities[ev_type].update(ev_codes) - - for evtype in filtered_types: - if evtype in all_capabilities: - del all_capabilities[evtype] - - return cls(events=all_capabilities, **kwargs) - - def __init__(self, - events=None, - name='py-evdev-uinput', - vendor=0x1, product=0x1, version=0x1, bustype=0x3, - devnode='/dev/uinput', phys='py-evdev-uinput', input_props=None): - ''' - Arguments - --------- - events : dict - Dictionary of event types mapping to lists of event codes. The - event types and codes that the uinput device will be able to - inject - defaults to all key codes. - - name - The name of the input device. - - vendor - Vendor identifier. - - product - Product identifier. - - version - Version identifier. - - bustype - Bustype identifier. - - phys - Physical path. - - input_props - Input properties and quirks. - - Note - ---- - If you do not specify any events, the uinput device will be able - to inject only ``KEY_*`` and ``BTN_*`` event codes. - ''' - - self.name = name #: Uinput device name. - self.vendor = vendor #: Device vendor identifier. - self.product = product #: Device product identifier. - self.version = version #: Device version identifier. - self.bustype = bustype #: Device bustype - e.g. ``BUS_USB``. - self.phys = phys #: Uinput device physical path. - self.devnode = devnode #: Uinput device node - e.g. ``/dev/uinput/``. - - if not events: - events = {ecodes.EV_KEY: ecodes.keys.keys()} - - self._verify() - - #: Write-only, non-blocking file descriptor to the uinput device node. - self.fd = _uinput.open(devnode) - - # Prepare the list of events for passing to _uinput.enable and _uinput.setup. - absinfo, prepared_events = self._prepare_events(events) - - # Set phys name - _uinput.set_phys(self.fd, phys) - - # Set properties - input_props = input_props or [] - for prop in input_props: - _uinput.set_prop(self.fd, prop) - - for etype, code in prepared_events: - _uinput.enable(self.fd, etype, code) - - _uinput.setup(self.fd, name, vendor, product, version, bustype, absinfo) - - # Create the uinput device. - _uinput.create(self.fd) - - self.dll = ctypes.CDLL(_uinput.__file__) - self.dll._uinput_begin_upload.restype = ctypes.c_int - self.dll._uinput_end_upload.restype = ctypes.c_int - - #: An :class:`InputDevice ` instance - #: for the fake input device. ``None`` if the device cannot be - #: opened for reading and writing. - self.device = self._find_device() - - def _prepare_events(self, events): - '''Prepare events for passing to _uinput.enable and _uinput.setup''' - absinfo, prepared_events = [], [] - for etype, codes in events.items(): - for code in codes: - # Handle max, min, fuzz, flat. - if isinstance(code, (tuple, list, device.AbsInfo)): - # Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0). - f = [code[0]] - f.extend(code[1]) - # Ensure the tuple is always 6 ints long, since uinput.c:uinput_create - # does little in the way of checking the length. - f.extend([0] * (6 - len(code[1]))) - absinfo.append(f) - code = code[0] - prepared_events.append((etype, code)) - return absinfo, prepared_events - - def __enter__(self): - return self - - def __exit__(self, type, value, tb): - if hasattr(self, 'fd'): - self.close() - - def __repr__(self): - # TODO: - v = (repr(getattr(self, i)) for i in - ('name', 'bustype', 'vendor', 'product', 'version', 'phys')) - return '{}({})'.format(self.__class__.__name__, ', '.join(v)) - - def __str__(self): - msg = ('name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\n' - 'event types: {}') - - evtypes = [i[0] for i in self.capabilities(True).keys()] - msg = msg.format(self.name, ecodes.BUS[self.bustype], - self.vendor, self.product, - self.version, self.phys, ' '.join(evtypes)) - - return msg - - def close(self): - # Close the associated InputDevice, if it was previously opened. - if self.device is not None: - self.device.close() - - # Destroy the uinput device. - if self.fd > -1: - _uinput.close(self.fd) - self.fd = -1 - - def syn(self): - ''' - Inject a ``SYN_REPORT`` event into the input subsystem. Events - queued by :func:`write()` will be fired. If possible, events - will be merged into an 'atomic' event. - ''' - - _uinput.write(self.fd, ecodes.EV_SYN, ecodes.SYN_REPORT, 0) - - def capabilities(self, verbose=False, absinfo=True): - '''See :func:`capabilities `.''' - if self.device is None: - raise UInputError('input device not opened - cannot read capabilities') - - return self.device.capabilities(verbose, absinfo) - - def begin_upload(self, effect_id): - upload = ff.UInputUpload() - upload.effect_id = effect_id - - ret = self.dll._uinput_begin_upload(self.fd, ctypes.byref(upload)) - if ret: - raise UInputError('Failed to begin uinput upload: ' + os.strerror(ret)) - - return upload - - def end_upload(self, upload): - ret = self.dll._uinput_end_upload(self.fd, ctypes.byref(upload)) - if ret: - raise UInputError('Failed to end uinput upload: ' + os.strerror(ret)) - - def begin_erase(self, effect_id): - erase = ff.UInputErase() - erase.effect_id = effect_id - - ret = self.dll._uinput_begin_erase(self.fd, ctypes.byref(erase)) - if ret: - raise UInputError('Failed to begin uinput erase: ' + os.strerror(ret)) - return erase - - def end_erase(self, erase): - ret = self.dll._uinput_end_erase(self.fd, ctypes.byref(erase)) - if ret: - raise UInputError('Failed to end uinput erase: ' + os.strerror(ret)) - - def _verify(self): - ''' - Verify that an uinput device exists and is readable and writable - by the current process. - ''' - - try: - m = os.stat(self.devnode)[stat.ST_MODE] - if not stat.S_ISCHR(m): - raise - except (IndexError, OSError): - msg = '"{}" does not exist or is not a character device file '\ - '- verify that the uinput module is loaded' - raise UInputError(msg.format(self.devnode)) - - if not os.access(self.devnode, os.W_OK): - msg = '"{}" cannot be opened for writing' - raise UInputError(msg.format(self.devnode)) - - if len(self.name) > _uinput.maxnamelen: - msg = 'uinput device name must not be longer than {} characters' - raise UInputError(msg.format(_uinput.maxnamelen)) - - def _find_device(self): - #:bug: the device node might not be immediately available - time.sleep(0.1) - - for path in util.list_devices('/dev/input/'): - d = device.InputDevice(path) - if d.name == self.name: - return d diff --git a/examples/udev-example.py b/examples/udev-example.py index 12a617c..8e827f6 100755 --- a/examples/udev-example.py +++ b/examples/udev-example.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 -''' +""" This is an example of using pyudev[1] alongside evdev. [1]: https://pyudev.readthedocs.org/ -''' +""" import functools import pyudev @@ -13,7 +13,7 @@ context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) -monitor.filter_by(subsystem='input') +monitor.filter_by(subsystem="input") monitor.start() fds = {monitor.fileno(): monitor} @@ -32,16 +32,16 @@ break # find the device we're interested in and add it to fds - for name in (i['NAME'] for i in udev.ancestors if 'NAME' in i): + for name in (i["NAME"] for i in udev.ancestors if "NAME" in i): # I used a virtual input device for this test - you # should adapt this to your needs - if u'py-evdev-uinput' in name: - if udev.action == u'add': - print('Device added: %s' % udev) + if "py-evdev-uinput" in name: + if udev.action == "add": + print("Device added: %s" % udev) fds[dev.fd] = InputDevice(udev.device_node) break - if udev.action == u'remove': - print('Device removed: %s' % udev) + if udev.action == "remove": + print("Device removed: %s" % udev) def helper(): global fds diff --git a/packaging/python-evdev.spec b/packaging/python-evdev.spec deleted file mode 100644 index a3cfb14..0000000 --- a/packaging/python-evdev.spec +++ /dev/null @@ -1,78 +0,0 @@ -Name: python-evdev -Version: 0.6.1 -Release: 1%{?dist} -Summary: Python bindings for the Linux input handling subsystem - -License: BSD -URL: https://python-evdev.readthedocs.io -Source0: https://github.com/gvalkov/%{name}/archive/v%{version}.tar.gz#/%{name}-%{version}.tar.gz - -BuildRequires: kernel-headers -BuildRequires: python2-devel -BuildRequires: python3-devel -BuildRequires: python2-setuptools -BuildRequires: python3-setuptools - - -%global _description \ -This package provides python bindings to the generic input event interface in \ -Linux. The evdev interface serves the purpose of passing events generated in \ -the kernel directly to userspace through character devices that are typically \ -located in /dev/input/. \ - \ -This package also comes with bindings to uinput, the userspace input subsystem. \ -Uinput allows userspace programs to create and handle input devices that can \ -inject events directly into the input subsystem. \ - \ -In other words, python-evdev allows you to read and write input events on Linux. \ -An event can be a key or button press, a mouse movement or a tap on a \ -touchscreen. - - -%description %{_description} - - -%package -n python2-evdev -Summary: %{summary} -%{?python_provide:%python_provide python2-evdev} -%description -n python2-evdev %{_description} - - -%package -n python3-evdev -Summary: %{summary} -%{?python_provide:%python_provide python3-evdev} -%description -n python3-evdev %{_description} - - -#------------------------------------------------------------------------------ -%prep -%autosetup - -#------------------------------------------------------------------------------ -%build -%py2_build -%py3_build - -#------------------------------------------------------------------------------ -%install -%py2_install -%py3_install - -#------------------------------------------------------------------------------ -%files -n python2-evdev -%license LICENSE -%doc README.rst -%{python2_sitearch}/evdev/ -%{python2_sitearch}/evdev-%{version}-py%{python2_version}.egg-info/ - -%files -n python3-evdev -%license LICENSE -%doc README.rst -%{python3_sitearch}/evdev/ -%{python3_sitearch}/evdev-%{version}-py%{python3_version}.egg-info/ - - -#------------------------------------------------------------------------------ -%changelog -* Sun Jun 05 2016 Georgi Valkov - 0.6.1-1 -- Initial RPM Release diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d0b4f7c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,56 @@ +[build-system] +requires = ["setuptools>=77.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "evdev" +version = "1.9.3" +description = "Bindings to the Linux input handling subsystem" +keywords = ["evdev", "input", "uinput"] +readme = "README.md" +license = "BSD-3-Clause" +requires-python = ">=3.9" +authors = [ + { name="Georgi Valkov", email="georgi.t.valkov@gmail.com" }, +] +maintainers = [ + { name="Tobi", email="proxima@sezanzeb.de" }, +] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Programming Language :: Python :: 3", + "Operating System :: POSIX :: Linux", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries", + "Programming Language :: Python :: Implementation :: CPython", +] + +[project.urls] +"Homepage" = "https://github.com/gvalkov/python-evdev" + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +ignore = ["E265", "E241", "F403", "F401", "E401", "E731"] + +[tool.bumpversion] +current_version = "1.9.3" +commit = true +tag = true +allow_dirty = true + +[[tool.bumpversion.files]] +filename = "pyproject.toml" + +[[tool.bumpversion.files]] +filename = "docs/conf.py" + +[tool.pylint.'MESSAGES CONTROL'] +disable = """ + no-member, +""" + +[tool.pylint.typecheck] +generated-members = ["evdev.ecodes.*"] +ignored-modules= ["evdev._*"] diff --git a/requirements-dev.txt b/requirements-dev.txt index c7eb2f2..725ad7f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,10 @@ -pytest ~= 7.1.0 -Sphinx ~= 4.4.0 -sphinx-copybutton ~= 0.5.0 -bump2version -sphinx_rtd_theme +pytest +Sphinx +sphinx-copybutton ~= 0.5.2 +sphinx-rtd-theme +ruff +bump-my-version ~= 0.17.4 +build twine +cibuildwheel +setuptools diff --git a/scripts/build-binary.sh b/scripts/build-binary.sh new file mode 100755 index 0000000..bbdae6c --- /dev/null +++ b/scripts/build-binary.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +set -o allexport +set -o nounset + +CIBW_MANYLINUX_X86_64_IMAGE="manylinux_2_28" +CIBW_MANYLINUX_I686_IMAGE="manylinux_2_28" +CIBW_CONTAINER_ENGINE="podman" +CIBW_SKIP="cp36-*" +CIBW_ARCHS_LINUX="auto64" +CIBW_BEFORE_ALL_LINUX=./scripts/cibw-before.sh +CIBW_TEST_COMMAND="python -c 'import evdev; print(evdev)'" +CIBW_ENVIRONMENT="PACKAGE_NAME=evdev-binary" + +exec cibuildwheel \ No newline at end of file diff --git a/scripts/cibw-before.sh b/scripts/cibw-before.sh new file mode 100755 index 0000000..25220d4 --- /dev/null +++ b/scripts/cibw-before.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + + +if [ -n "$PACKAGE_NAME" ]; then + sed -i -re 's,^(name = ")evdev("),\1'${PACKAGE_NAME}'\2,' pyproject.toml +fi \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 9e2dc85..0000000 --- a/setup.cfg +++ /dev/null @@ -1,13 +0,0 @@ -[bumpversion] -current_version = 1.6.0 -message = Bump version: {current_version} -> {new_version} -commit = True -tag = True - -[flake8] -ignore = W191,E302,E265,E241,F403,E401 -max-line-length = 110 - -[bumpversion:file:setup.py] - -[bumpversion:file:docs/conf.py] diff --git a/setup.py b/setup.py index 847485e..1f6eaac 100755 --- a/setup.py +++ b/setup.py @@ -1,86 +1,46 @@ -#!/usr/bin/env python -# encoding: utf-8 - import os import sys +import shutil import textwrap +import platform +from pathlib import Path +from subprocess import run + +from setuptools import setup, Extension, Command +from setuptools.command import build_ext as _build_ext + + +curdir = Path(__file__).resolve().parent +ecodes_c_path = curdir / "src/evdev/ecodes.c" -from os.path import abspath, dirname, join as pjoin - -#----------------------------------------------------------------------------- -try: - from setuptools import setup, Extension, Command - from setuptools.command import build_ext as _build_ext -except ImportError: - from distutils.core import setup, Extension, Command - from distutils.command import build_ext as _build_ext - - -#----------------------------------------------------------------------------- -here = abspath(dirname(__file__)) - -#----------------------------------------------------------------------------- -classifiers = [ - 'Development Status :: 5 - Production/Stable', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Operating System :: POSIX :: Linux', - 'Intended Audience :: Developers', - 'Topic :: Software Development :: Libraries', - 'License :: OSI Approved :: BSD License', - 'Programming Language :: Python :: Implementation :: CPython', -] - -#----------------------------------------------------------------------------- -cflags = ['-std=c99', '-Wno-error=declaration-after-statement'] -input_c = Extension('evdev._input', sources=['evdev/input.c'], extra_compile_args=cflags) -uinput_c = Extension('evdev._uinput', sources=['evdev/uinput.c'], extra_compile_args=cflags) -ecodes_c = Extension('evdev._ecodes', sources=['evdev/ecodes.c'], extra_compile_args=cflags) - -#----------------------------------------------------------------------------- -kw = { - 'name': 'evdev', - 'version': '1.6.0', - - 'description': 'Bindings to the Linux input handling subsystem', - 'long_description': open(pjoin(here, 'README.rst')).read(), - - 'author': 'Georgi Valkov', - 'author_email': 'georgi.t.valkov@gmail.com', - 'license': 'Revised BSD License', - 'keywords': 'evdev input uinput', - 'url': 'https://github.com/gvalkov/python-evdev', - 'classifiers': classifiers, - - 'packages': ['evdev'], - 'ext_modules': [input_c, uinput_c, ecodes_c], - 'include_package_data': False, - 'zip_safe': True, - 'cmdclass': {}, -} - - -#----------------------------------------------------------------------------- -def create_ecodes(headers=None): + +def create_ecodes(headers=None, reproducible=False): if not headers: - headers = [ - '/usr/include/linux/input.h', - '/usr/include/linux/input-event-codes.h', - '/usr/include/linux/uinput.h', - ] + include_paths = set() + cpath = os.environ.get("CPATH", "").strip() + c_inc_path = os.environ.get("C_INCLUDE_PATH", "").strip() + + if cpath: + include_paths.update(cpath.split(":")) + if c_inc_path: + include_paths.update(c_inc_path.split(":")) + + include_paths.add("/usr/include") + if platform.system().lower() == "freebsd": + files = ["dev/evdev/input.h", "dev/evdev/input-event-codes.h", "dev/evdev/uinput.h"] + else: + files = ["linux/input.h", "linux/input-event-codes.h", "linux/uinput.h"] + + headers = [os.path.join(path, file) for path in include_paths for file in files] headers = [header for header in headers if os.path.isfile(header)] if not headers: - msg = '''\ + msg = """\ The 'linux/input.h' and 'linux/input-event-codes.h' include files are missing. You will have to install the kernel header files in order to continue: - yum install kernel-headers-$(uname -r) + dnf install kernel-headers-$(uname -r) apt-get install linux-headers-$(uname -r) emerge sys-kernel/linux-headers pacman -S kernel-headers @@ -92,60 +52,84 @@ def create_ecodes(headers=None): python setup.py \\ build \\ build_ecodes --evdev-headers path/input.h:path/input-event-codes.h \\ - build_ext --include-dirs path/ \\ + build_ext --include-dirs path/ \\ install - ''' + + If you want to avoid building this package from source, then please consider + installing the `evdev-binary` package instead. Keep in mind that it may not be + fully compatible with, or support all the features of your current kernel. + """ sys.stderr.write(textwrap.dedent(msg)) sys.exit(1) - from subprocess import check_call - - print('writing ecodes.c (using %s)' % ' '.join(headers)) - cmd = '%s genecodes.py %s > ecodes.c' % (sys.executable, ' '.join(headers)) - check_call(cmd, cwd="%s/evdev" % here, shell=True) + print("writing %s (using %s)" % (ecodes_c_path, " ".join(headers))) + with ecodes_c_path.open("w") as fh: + cmd = [sys.executable, "src/evdev/genecodes_c.py"] + if reproducible: + cmd.append("--reproducible") + cmd.extend(["--ecodes", *headers]) + run(cmd, check=True, stdout=fh) -#----------------------------------------------------------------------------- class build_ecodes(Command): - description = 'generate ecodes.c' + description = "generate ecodes.c" user_options = [ - ('evdev-headers=', None, 'colon-separated paths to input subsystem headers'), + ("evdev-headers=", None, "colon-separated paths to input subsystem headers"), + ("reproducible", None, "hide host details (host/paths) to create a reproducible output"), ] def initialize_options(self): self.evdev_headers = None + self.reproducible = False def finalize_options(self): if self.evdev_headers: - self.evdev_headers = self.evdev_headers.split(':') + self.evdev_headers = self.evdev_headers.split(":") + if self.reproducible is None: + self.reproducible = False def run(self): - create_ecodes(self.evdev_headers) + create_ecodes(self.evdev_headers, reproducible=self.reproducible) class build_ext(_build_ext.build_ext): def has_ecodes(self): - ecodes_path = os.path.join(here, 'evdev/ecodes.c') - res = os.path.exists(ecodes_path) - if res: - print('ecodes.c already exists ... skipping build_ecodes') - return not res + if ecodes_c_path.exists(): + print("ecodes.c already exists ... skipping build_ecodes") + return False + return True + + def generate_ecodes_py(self): + ecodes_py = Path(self.build_lib) / "evdev/ecodes.py" + print(f"writing {ecodes_py}") + with ecodes_py.open("w") as fh: + cmd = [sys.executable, "-B", "src/evdev/genecodes_py.py"] + res = run(cmd, env={"PYTHONPATH": self.build_lib}, stdout=fh) + + if res.returncode != 0: + print(f"failed to generate static {ecodes_py} - will use ecodes_runtime.py") + shutil.copy("src/evdev/ecodes_runtime.py", ecodes_py) def run(self): for cmd_name in self.get_sub_commands(): self.run_command(cmd_name) _build_ext.build_ext.run(self) - - sub_commands = [('build_ecodes', has_ecodes)] + _build_ext.build_ext.sub_commands - - -#----------------------------------------------------------------------------- -kw['cmdclass']['build_ext'] = build_ext -kw['cmdclass']['build_ecodes'] = build_ecodes - - -#----------------------------------------------------------------------------- -if __name__ == '__main__': - setup(**kw) + self.generate_ecodes_py() + + sub_commands = [("build_ecodes", has_ecodes)] + _build_ext.build_ext.sub_commands + + +cflags = ["-std=c99", "-Wno-error=declaration-after-statement"] +setup( + ext_modules=[ + Extension("evdev._input", sources=["src/evdev/input.c"], extra_compile_args=cflags), + Extension("evdev._uinput", sources=["src/evdev/uinput.c"], extra_compile_args=cflags), + Extension("evdev._ecodes", sources=["src/evdev/ecodes.c"], extra_compile_args=cflags), + ], + cmdclass={ + "build_ext": build_ext, + "build_ecodes": build_ecodes, + }, +) diff --git a/src/evdev/__init__.py b/src/evdev/__init__.py new file mode 100644 index 0000000..bae0fec --- /dev/null +++ b/src/evdev/__init__.py @@ -0,0 +1,39 @@ +# -------------------------------------------------------------------------- +# Gather everything into a single, convenient namespace. +# -------------------------------------------------------------------------- + +# The superfluous "import name as name" syntax is here to satisfy mypy's attrs-defined rule. +# Alternatively all exported objects can be listed in __all__. + +from . import ( + ecodes as ecodes, + ff as ff, +) + +from .device import ( + AbsInfo as AbsInfo, + DeviceInfo as DeviceInfo, + EvdevError as EvdevError, + InputDevice as InputDevice, +) + +from .events import ( + AbsEvent as AbsEvent, + InputEvent as InputEvent, + KeyEvent as KeyEvent, + RelEvent as RelEvent, + SynEvent as SynEvent, + event_factory as event_factory, +) + +from .uinput import ( + UInput as UInput, + UInputError as UInputError, +) + +from .util import ( + categorize as categorize, + list_devices as list_devices, + resolve_ecodes as resolve_ecodes, + resolve_ecodes_dict as resolve_ecodes_dict, +) diff --git a/evdev/device.py b/src/evdev/device.py similarity index 68% rename from evdev/device.py rename to src/evdev/device.py index dbfad5d..a7f9b92 100644 --- a/evdev/device.py +++ b/src/evdev/device.py @@ -1,34 +1,21 @@ -# encoding: utf-8 - -import os -import warnings import contextlib -import collections +import os +from typing import Dict, Generic, Iterator, List, Literal, NamedTuple, Tuple, TypeVar, Union, overload -from evdev import _input, ecodes, util -from evdev.events import InputEvent +from . import _input, ecodes, util try: - from evdev.eventio_async import EventIO, EvdevError + from .eventio_async import EvdevError, EventIO except ImportError: - from evdev.eventio import EventIO, EvdevError - - -#-------------------------------------------------------------------------- -_AbsInfo = collections.namedtuple( - 'AbsInfo', ['value', 'min', 'max', 'fuzz', 'flat', 'resolution']) + from .eventio import EvdevError, EventIO -_KbdInfo = collections.namedtuple( - 'KbdInfo', ['repeat', 'delay']) +_AnyStr = TypeVar("_AnyStr", str, bytes) -_DeviceInfo = collections.namedtuple( - 'DeviceInfo', ['bustype', 'vendor', 'product', 'version']) +class AbsInfo(NamedTuple): + """Absolute axis information. -class AbsInfo(_AbsInfo): - '''Absolute axis information. - - A ``namedtuple`` used for storing absolute axis information - + A ``namedtuple`` with absolute axis information - corresponds to the ``input_absinfo`` struct: Attributes @@ -62,72 +49,85 @@ class AbsInfo(_AbsInfo): The input core does not clamp reported values to the ``[minimum, maximum]`` limits, such task is left to userspace. - ''' + """ + + value: int + min: int + max: int + fuzz: int + flat: int + resolution: int def __str__(self): - return 'val {}, min {}, max {}, fuzz {}, flat {}, res {}'.format(*self) + return "value {}, min {}, max {}, fuzz {}, flat {}, res {}".format(*self) # pylint: disable=not-an-iterable -class KbdInfo(_KbdInfo): - '''Keyboard repeat rate. +class KbdInfo(NamedTuple): + """Keyboard repeat rate. Attributes ---------- - repeat - Keyboard repeat rate in characters per second. - delay Amount of time that a key must be depressed before it will start to repeat (in milliseconds). - ''' + + repeat + Keyboard repeat rate in characters per second. + """ + + delay: int + repeat: int def __str__(self): - return 'repeat {}, delay {}'.format(*self) + return "delay {}, repeat {}".format(self.delay, self.repeat) -class DeviceInfo(_DeviceInfo): - ''' +class DeviceInfo(NamedTuple): + """ Attributes ---------- bustype vendor product version - ''' + """ - def __str__(self): - msg = 'bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}' - return msg.format(*self) + bustype: int + vendor: int + product: int + version: int + def __str__(self) -> str: + msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}" + return msg.format(*self) # pylint: disable=not-an-iterable -class InputDevice(EventIO): - ''' + +class InputDevice(EventIO, Generic[_AnyStr]): + """ A linux input device from which input events can be read. - ''' + """ - __slots__ = ('path', 'fd', 'info', 'name', 'phys', 'uniq', '_rawcapabilities', - 'version', 'ff_effects_count') + __slots__ = ("path", "fd", "info", "name", "phys", "uniq", "_rawcapabilities", "version", "ff_effects_count") - def __init__(self, dev): - ''' + def __init__(self, dev: Union[_AnyStr, "os.PathLike[_AnyStr]"]): + """ Arguments --------- dev : str|bytes|PathLike Path to input device - ''' + """ #: Path to input device. - self.path = dev if not hasattr(dev, '__fspath__') else dev.__fspath__() + self.path: _AnyStr = dev if not hasattr(dev, "__fspath__") else dev.__fspath__() - # Certain operations are possible only when the device is opened in - # read-write mode. + # Certain operations are possible only when the device is opened in read-write mode. try: fd = os.open(dev, os.O_RDWR | os.O_NONBLOCK) except OSError: fd = os.open(dev, os.O_RDONLY | os.O_NONBLOCK) #: A non-blocking file descriptor to the device file. - self.fd = fd + self.fd: int = fd # Returns (bustype, vendor, product, version, name, phys, capabilities). info_res = _input.ioctl_devinfo(self.fd) @@ -136,16 +136,16 @@ def __init__(self, dev): self.info = DeviceInfo(*info_res[:4]) #: The name of the event device. - self.name = info_res[4] + self.name: str = info_res[4] #: The physical topology of the device. - self.phys = info_res[5] + self.phys: str = info_res[5] - #: The unique address of the device. - self.uniq = info_res[6] + #: The unique identifier of the device. + self.uniq: str = info_res[6] #: The evdev protocol version. - self.version = _input.ioctl_EVIOCGVERSION(self.fd) + self.version: int = _input.ioctl_EVIOCGVERSION(self.fd) #: The raw dictionary of device capabilities - see `:func:capabilities()`. self._rawcapabilities = _input.ioctl_capabilities(self.fd) @@ -153,18 +153,18 @@ def __init__(self, dev): #: The number of force feedback effects the device can keep in its memory. self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd) - def __del__(self): - if hasattr(self, 'fd') and self.fd is not None: + def __del__(self) -> None: + if hasattr(self, "fd") and self.fd is not None: try: self.close() except (OSError, ImportError, AttributeError): pass - def _capabilities(self, absinfo=True): + def _capabilities(self, absinfo: bool = True): res = {} - for etype, ecodes in self._rawcapabilities.items(): - for code in ecodes: + for etype, _ecodes in self._rawcapabilities.items(): + for code in _ecodes: l = res.setdefault(etype, []) if isinstance(code, tuple): if absinfo: @@ -178,8 +178,14 @@ def _capabilities(self, absinfo=True): return res - def capabilities(self, verbose=False, absinfo=True): - ''' + @overload + def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]: + ... + @overload + def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]: + ... + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]: + """ Return the event types that this device supports as a mapping of supported event types to lists of handled event codes. @@ -216,15 +222,15 @@ def capabilities(self, verbose=False, absinfo=True): { ('EV_ABS', 3): [ (('ABS_X', 0), AbsInfo(min=0, max=255, fuzz=0, flat=0)), (('ABS_Y', 1), AbsInfo(min=0, max=255, fuzz=0, flat=0)) ]} - ''' + """ if verbose: return dict(util.resolve_ecodes_dict(self._capabilities(absinfo))) else: return self._capabilities(absinfo) - def input_props(self, verbose=False): - ''' + def input_props(self, verbose: bool = False): + """ Get device properties and quirks. Example @@ -237,15 +243,15 @@ def input_props(self, verbose=False): [('INPUT_PROP_POINTER', 0), ('INPUT_PROP_POINTING_STICK', 5)] - ''' + """ props = _input.ioctl_EVIOCGPROP(self.fd) if verbose: return util.resolve_ecodes(ecodes.INPUT_PROP, props) return props - def leds(self, verbose=False): - ''' + def leds(self, verbose: bool = False): + """ Return currently set LED keys. Example @@ -258,42 +264,41 @@ def leds(self, verbose=False): [('LED_NUML', 0), ('LED_CAPSL', 1), ('LED_MISC', 8), ('LED_MAIL', 9)] - ''' + """ leds = _input.ioctl_EVIOCG_bits(self.fd, ecodes.EV_LED) if verbose: return util.resolve_ecodes(ecodes.LED, leds) return leds - def set_led(self, led_num, value): - ''' + def set_led(self, led_num: int, value: int) -> None: + """ Set the state of the selected LED. Example ------- >>> device.set_led(ecodes.LED_NUML, 1) - ''' + """ self.write(ecodes.EV_LED, led_num, value) def __eq__(self, other): - ''' + """ Two devices are equal if their :data:`info` attributes are equal. - ''' - return isinstance(other, self.__class__) and self.info == other.info \ - and self.path == other.path + """ + return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path - def __str__(self): - msg = 'device {}, name "{}", phys "{}"' - return msg.format(self.path, self.name, self.phys) + def __str__(self) -> str: + msg = 'device {}, name "{}", phys "{}", uniq "{}"' + return msg.format(self.path, self.name, self.phys, self.uniq or "") - def __repr__(self): + def __repr__(self) -> str: msg = (self.__class__.__name__, self.path) - return '{}({!r})'.format(*msg) + return "{}({!r})".format(*msg) def __fspath__(self): return self.path - def close(self): + def close(self) -> None: if self.fd > -1: try: super().close() @@ -301,73 +306,73 @@ def close(self): finally: self.fd = -1 - def grab(self): - ''' + def grab(self) -> None: + """ Grab input device using ``EVIOCGRAB`` - other applications will be unable to receive events until the device is released. Only one process can hold a ``EVIOCGRAB`` on a device. Warning ------- - Grabbing an already grabbed device will raise an ``IOError``. - ''' + Grabbing an already grabbed device will raise an ``OSError``. + """ _input.ioctl_EVIOCGRAB(self.fd, 1) - def ungrab(self): - ''' + def ungrab(self) -> None: + """ Release device if it has been already grabbed (uses `EVIOCGRAB`). Warning ------- Releasing an already released device will raise an - ``IOError('Invalid argument')``. - ''' + ``OSError('Invalid argument')``. + """ _input.ioctl_EVIOCGRAB(self.fd, 0) @contextlib.contextmanager - def grab_context(self): - ''' + def grab_context(self) -> Iterator[None]: + """ A context manager for the duration of which only the current process will be able to receive events from the device. - ''' + """ self.grab() yield self.ungrab() - def upload_effect(self, effect): - ''' + def upload_effect(self, effect: "ff.Effect"): + """ Upload a force feedback effect to a force feedback device. - ''' + """ data = memoryview(effect).tobytes() ff_id = _input.upload_effect(self.fd, data) return ff_id - def erase_effect(self, ff_id): - ''' + def erase_effect(self, ff_id) -> None: + """ Erase a force effect from a force feedback device. This also stops the effect. - ''' + """ _input.erase_effect(self.fd, ff_id) @property def repeat(self): - ''' + """ Get or set the keyboard repeat rate (in characters per minute) and delay (in milliseconds). - ''' + """ return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd)) @repeat.setter - def repeat(self, value): + def repeat(self, value: Tuple[int, int]): return _input.ioctl_EVIOCSREP(self.fd, *value) - def active_keys(self, verbose=False): - ''' + def active_keys(self, verbose: bool = False): + """ Return currently active keys. Example @@ -382,21 +387,15 @@ def active_keys(self, verbose=False): [('KEY_ESC', 1), ('KEY_LEFTSHIFT', 42)] - ''' + """ active_keys = _input.ioctl_EVIOCG_bits(self.fd, ecodes.EV_KEY) if verbose: return util.resolve_ecodes(ecodes.KEY, active_keys) return active_keys - @property - def fn(self): - msg = 'Please use {0}.path instead of {0}.fn'.format(self.__class__.__name__) - warnings.warn(msg, DeprecationWarning, stacklevel=2) - return self.path - - def absinfo(self, axis_num): - ''' + def absinfo(self, axis_num: int): + """ Return current :class:`AbsInfo` for input device axis Arguments @@ -408,11 +407,11 @@ def absinfo(self, axis_num): ------- >>> device.absinfo(ecodes.ABS_X) AbsInfo(value=1501, min=-32768, max=32767, fuzz=0, flat=128, resolution=0) - ''' + """ return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num)) - def set_absinfo(self, axis_num, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None): - ''' + def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None: + """ Update :class:`AbsInfo` values. Only specified values will be overwritten. Arguments @@ -427,13 +426,15 @@ def set_absinfo(self, axis_num, value=None, min=None, max=None, fuzz=None, flat= You can also unpack AbsInfo tuple that will overwrite all values >>> device.set_absinfo(ecodes.ABS_Y, *AbsInfo(0, -2000, 2000, 0, 15, 0)) - ''' + """ cur_absinfo = self.absinfo(axis_num) - new_absinfo = AbsInfo(value if value is not None else cur_absinfo.value, - min if min is not None else cur_absinfo.min, - max if max is not None else cur_absinfo.max, - fuzz if fuzz is not None else cur_absinfo.fuzz, - flat if flat is not None else cur_absinfo.flat, - resolution if resolution is not None else cur_absinfo.resolution) + new_absinfo = AbsInfo( + value if value is not None else cur_absinfo.value, + min if min is not None else cur_absinfo.min, + max if max is not None else cur_absinfo.max, + fuzz if fuzz is not None else cur_absinfo.fuzz, + flat if flat is not None else cur_absinfo.flat, + resolution if resolution is not None else cur_absinfo.resolution, + ) _input.ioctl_EVIOCSABS(self.fd, axis_num, new_absinfo) diff --git a/src/evdev/ecodes.py b/src/evdev/ecodes.py new file mode 100644 index 0000000..fd4afc4 --- /dev/null +++ b/src/evdev/ecodes.py @@ -0,0 +1,5 @@ +# When installed, this module is replaced by an ecodes.py generated at +# build time by genecodes_py.py (see build_ext in setup.py). + +# This stub exists to make development of evdev itself more convenient. +from .ecodes_runtime import * diff --git a/evdev/ecodes.py b/src/evdev/ecodes_runtime.py similarity index 80% rename from evdev/ecodes.py rename to src/evdev/ecodes_runtime.py index e3e4ffc..47f3b23 100644 --- a/evdev/ecodes.py +++ b/src/evdev/ecodes_runtime.py @@ -1,6 +1,5 @@ -# encoding: utf-8 - -''' +# pylint: disable=undefined-variable +""" This modules exposes the integer constants defined in ``linux/input.h`` and ``linux/input-event-codes.h``. @@ -34,26 +33,26 @@ codes. For example:: >>> evdev.ecodes.FF[80] - ['FF_EFFECT_MIN', 'FF_RUMBLE'] + ('FF_EFFECT_MIN', 'FF_RUMBLE') >>> evdev.ecodes.FF[81] 'FF_PERIODIC' -''' +""" from inspect import getmembers -from evdev import _ecodes +from . import _ecodes #: Mapping of names to values. ecodes = {} -prefixes = 'KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF INPUT_PROP' -prev_prefix = '' +prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF INPUT_PROP UI_FF".split() +prev_prefix = "" g = globals() # eg. code: 'REL_Z', val: 2 for code, val in getmembers(_ecodes): - for prefix in prefixes.split(): # eg. 'REL' + for prefix in prefixes: # eg. 'REL' if code.startswith(prefix): ecodes[code] = val # FF_STATUS codes should not appear in the FF reverse mapping @@ -72,6 +71,15 @@ prev_prefix = prefix + +# Convert lists to tuples. +k, v = None, None +for prefix in prefixes: + for k, v in g[prefix].items(): + if isinstance(v, list): + g[prefix][k] = tuple(v) + + #: Keys are a combination of all BTN and KEY codes. keys = {} keys.update(BTN) @@ -87,17 +95,17 @@ _ecodes.EV_KEY: keys, _ecodes.EV_ABS: ABS, _ecodes.EV_REL: REL, - _ecodes.EV_SW: SW, + _ecodes.EV_SW: SW, _ecodes.EV_MSC: MSC, _ecodes.EV_LED: LED, _ecodes.EV_REP: REP, _ecodes.EV_SND: SND, _ecodes.EV_SYN: SYN, - _ecodes.EV_FF: FF, + _ecodes.EV_FF: FF, _ecodes.EV_FF_STATUS: FF_STATUS, } from evdev._ecodes import * # cheaper than whitelisting in an __all__ -del code, val, prefix, getmembers, g, d, prefixes, prev_prefix +del code, val, prefix, getmembers, g, d, k, v, prefixes, prev_prefix diff --git a/evdev/eventio.py b/src/evdev/eventio.py similarity index 76% rename from evdev/eventio.py rename to src/evdev/eventio.py index 8d84f55..bdb91a4 100644 --- a/evdev/eventio.py +++ b/src/evdev/eventio.py @@ -1,20 +1,20 @@ -# encoding: utf-8 - -import os import fcntl -import select import functools +import os +import select +from typing import Iterator, Union -from evdev import _input, _uinput, ecodes, util -from evdev.events import InputEvent +from . import _input, _uinput, ecodes +from .events import InputEvent -#-------------------------------------------------------------------------- + +# -------------------------------------------------------------------------- class EvdevError(Exception): pass class EventIO: - ''' + """ Base class for reading and writing input events. This class is used by :class:`InputDevice` and :class:`UInput`. @@ -26,33 +26,33 @@ class EventIO: - On, :class:`UInput` it used for writing user-generated events (e.g. key presses, mouse movements) and reading feedback events (e.g. leds, beeps). - ''' + """ def fileno(self): - ''' + """ Return the file descriptor to the open event device. This makes it possible to pass instances directly to :func:`select.select()` and :class:`asyncore.file_dispatcher`. - ''' + """ return self.fd - def read_loop(self): - ''' + def read_loop(self) -> Iterator[InputEvent]: + """ Enter an endless :func:`select.select()` loop that yields input events. - ''' + """ while True: r, w, x = select.select([self.fd], [], []) for event in self.read(): yield event - def read_one(self): - ''' + def read_one(self) -> Union[InputEvent, None]: + """ Read and return a single input event as an instance of :class:`InputEvent `. Return ``None`` if there are no pending input events. - ''' + """ # event -> (sec, usec, type, code, val) event = _input.device_read(self.fd) @@ -60,35 +60,39 @@ def read_one(self): if event: return InputEvent(*event) - def read(self): - ''' + def read(self) -> Iterator[InputEvent]: + """ Read multiple input events from device. Return a generator object that yields :class:`InputEvent ` instances. Raises `BlockingIOError` if there are no available events at the moment. - ''' + """ - # events -> [(sec, usec, type, code, val), ...] + # events -> ((sec, usec, type, code, val), ...) events = _input.device_read_many(self.fd) for event in events: yield InputEvent(*event) + # pylint: disable=no-self-argument def need_write(func): - ''' + """ Decorator that raises :class:`EvdevError` if there is no write access to the input device. - ''' + """ + @functools.wraps(func) def wrapper(*args): fd = args[0].fd if fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_RDWR: + # pylint: disable=not-callable return func(*args) msg = 'no write access to device "%s"' % args[0].path raise EvdevError(msg) + return wrapper def write_event(self, event): - ''' + """ Inject an input event into the input subsystem. Events are queued until a synchronization event is received. @@ -103,16 +107,16 @@ def write_event(self, event): ------- >>> ev = InputEvent(1334414993, 274296, ecodes.EV_KEY, ecodes.KEY_A, 1) >>> ui.write_event(ev) - ''' + """ - if hasattr(event, 'event'): + if hasattr(event, "event"): event = event.event self.write(event.type, event.code, event.value) @need_write - def write(self, etype, code, value): - ''' + def write(self, etype: int, code: int, value: int): + """ Inject an input event into the input subsystem. Events are queued until a synchronization event is received. @@ -131,9 +135,18 @@ def write(self, etype, code, value): --------- >>> ui.write(e.EV_KEY, e.KEY_A, 1) # key A - down >>> ui.write(e.EV_KEY, e.KEY_A, 0) # key A - up - ''' + """ _uinput.write(self.fd, etype, code, value) + def syn(self): + """ + Inject a ``SYN_REPORT`` event into the input subsystem. Events + queued by :func:`write()` will be fired. If possible, events + will be merged into an 'atomic' event. + """ + + self.write(ecodes.EV_SYN, ecodes.SYN_REPORT, 0) + def close(self): pass diff --git a/evdev/eventio_async.py b/src/evdev/eventio_async.py similarity index 84% rename from evdev/eventio_async.py rename to src/evdev/eventio_async.py index 2d3468e..4af1aab 100644 --- a/evdev/eventio_async.py +++ b/src/evdev/eventio_async.py @@ -1,11 +1,56 @@ -# encoding: utf-8 - import asyncio import select +import sys + +from . import eventio +from .events import InputEvent -from evdev import eventio # needed for compatibility -from evdev.eventio import EvdevError +from .eventio import EvdevError + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing import Any as Self + + +class ReadIterator: + def __init__(self, device): + self.current_batch = iter(()) + self.device = device + + # Standard iterator protocol. + def __iter__(self) -> Self: + return self + + def __next__(self) -> InputEvent: + try: + # Read from the previous batch of events. + return next(self.current_batch) + except StopIteration: + r, w, x = select.select([self.device.fd], [], []) + self.current_batch = self.device.read() + return next(self.current_batch) + + def __aiter__(self) -> Self: + return self + + def __anext__(self) -> "asyncio.Future[InputEvent]": + future = asyncio.Future() + try: + # Read from the previous batch of events. + future.set_result(next(self.current_batch)) + except StopIteration: + + def next_batch_ready(batch): + try: + self.current_batch = batch.result() + future.set_result(next(self.current_batch)) + except Exception as e: + future.set_exception(e) + + self.device.async_read().add_done_callback(next_batch_ready) + return future class EventIO(eventio.EventIO): @@ -15,6 +60,7 @@ def _do_when_readable(self, callback): def ready(): loop.remove_reader(self.fileno()) callback() + loop.add_reader(self.fileno(), ready) def _set_result(self, future, cb): @@ -24,30 +70,30 @@ def _set_result(self, future, cb): future.set_exception(error) def async_read_one(self): - ''' + """ Asyncio coroutine to read and return a single input event as an instance of :class:`InputEvent `. - ''' + """ future = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, self.read_one)) return future def async_read(self): - ''' + """ Asyncio coroutine to read multiple input events from device. Return a generator object that yields :class:`InputEvent ` instances. - ''' + """ future = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, self.read)) return future - def async_read_loop(self): - ''' + def async_read_loop(self) -> ReadIterator: + """ Return an iterator that yields input events. This iterator is compatible with the ``async for`` syntax. - ''' + """ return ReadIterator(self) def close(self): @@ -58,40 +104,3 @@ def close(self): # no event loop present, so there is nothing to # remove the reader from. Ignore pass - - -class ReadIterator: - def __init__(self, device): - self.current_batch = iter(()) - self.device = device - - # Standard iterator protocol. - def __iter__(self): - return self - - def __next__(self): - try: - # Read from the previous batch of events. - return next(self.current_batch) - except StopIteration: - r, w, x = select.select([self.device.fd], [], []) - self.current_batch = self.device.read() - return next(self.current_batch) - - def __aiter__(self): - return self - - def __anext__(self): - future = asyncio.Future() - try: - # Read from the previous batch of events. - future.set_result(next(self.current_batch)) - except StopIteration: - def next_batch_ready(batch): - try: - self.current_batch = batch.result() - future.set_result(next(self.current_batch)) - except Exception as e: - future.set_exception(e) - self.device.async_read().add_done_callback(next_batch_ready) - return future diff --git a/evdev/events.py b/src/evdev/events.py similarity index 56% rename from evdev/events.py rename to src/evdev/events.py index af42386..922bfe6 100644 --- a/evdev/events.py +++ b/src/evdev/events.py @@ -1,6 +1,4 @@ -# encoding: utf-8 - -''' +""" This module provides the :class:`InputEvent` class, which closely resembles the ``input_event`` struct defined in ``linux/input.h``: @@ -34,66 +32,67 @@ key event at 1337197425.477835, 28 (KEY_ENTER), up >>> print(repr(key_event)) KeyEvent(InputEvent(1337197425L, 477835L, 1, 28, 0L)) -''' +""" # event type descriptions have been taken mot-a-mot from: # http://www.kernel.org/doc/Documentation/input/event-codes.txt -from evdev.ecodes import keys, KEY, SYN, REL, ABS, EV_KEY, EV_REL, EV_ABS, EV_SYN +# pylint: disable=no-name-in-module +from typing import Final +from .ecodes import ABS, EV_ABS, EV_KEY, EV_REL, EV_SYN, KEY, REL, SYN, keys class InputEvent: - '''A generic input event.''' + """A generic input event.""" - __slots__ = 'sec', 'usec', 'type', 'code', 'value' + __slots__ = "sec", "usec", "type", "code", "value" def __init__(self, sec, usec, type, code, value): #: Time in seconds since epoch at which event occurred. - self.sec = sec + self.sec: int = sec #: Microsecond portion of the timestamp. - self.usec = usec + self.usec: int = usec #: Event type - one of ``ecodes.EV_*``. - self.type = type + self.type: int = type #: Event code related to the event type. - self.code = code + self.code: int = code #: Event value related to the event type. - self.value = value + self.value: int = value - def timestamp(self): - '''Return event timestamp as a float.''' + def timestamp(self) -> float: + """Return event timestamp as a float.""" return self.sec + (self.usec / 1000000.0) - def __str__(s): - msg = 'event at {:f}, code {:02d}, type {:02d}, val {:02d}' - return msg.format(s.timestamp(), s.code, s.type, s.value) + def __str__(self): + msg = "event at {:f}, code {:02d}, type {:02d}, val {:02d}" + return msg.format(self.timestamp(), self.code, self.type, self.value) - def __repr__(s): - msg = '{}({!r}, {!r}, {!r}, {!r}, {!r})' - return msg.format(s.__class__.__name__, - s.sec, s.usec, s.type, s.code, s.value) + def __repr__(self): + msg = "{}({!r}, {!r}, {!r}, {!r}, {!r})" + return msg.format(self.__class__.__name__, self.sec, self.usec, self.type, self.code, self.value) class KeyEvent: - '''An event generated by a keyboard, button or other key-like devices.''' + """An event generated by a keyboard, button or other key-like devices.""" - key_up = 0x0 - key_down = 0x1 - key_hold = 0x2 + key_up: Final[int] = 0x0 + key_down: Final[int] = 0x1 + key_hold: Final[int] = 0x2 - __slots__ = 'scancode', 'keycode', 'keystate', 'event' + __slots__ = "scancode", "keycode", "keystate", "event" - def __init__(self, event, allow_unknown=False): - ''' + def __init__(self, event: InputEvent, allow_unknown: bool = False): + """ The ``allow_unknown`` argument determines what to do in the event of an event code for which a key code cannot be found. If ``False`` a ``KeyError`` will be raised. If ``True`` the keycode will be set to the hex value of the event code. - ''' + """ - self.scancode = event.code + self.scancode: int = event.code if event.value == 0: self.keystate = KeyEvent.key_up @@ -106,79 +105,78 @@ def __init__(self, event, allow_unknown=False): self.keycode = keys[event.code] except KeyError: if allow_unknown: - self.keycode = '0x{:02X}'.format(event.code) + self.keycode = "0x{:02X}".format(event.code) else: raise #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): try: - ks = ('up', 'down', 'hold')[self.keystate] + ks = ("up", "down", "hold")[self.keystate] except IndexError: - ks = 'unknown' + ks = "unknown" - msg = 'key event at {:f}, {} ({}), {}' - return msg.format(self.event.timestamp(), - self.scancode, self.keycode, ks) + msg = "key event at {:f}, {} ({}), {}" + return msg.format(self.event.timestamp(), self.scancode, self.keycode, ks) - def __repr__(s): - return '{}({!r})'.format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class RelEvent: - '''A relative axis event (e.g moving the mouse 5 units to the left).''' + """A relative axis event (e.g moving the mouse 5 units to the left).""" - __slots__ = 'event' + __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): - msg = 'relative axis event at {:f}, {}' + msg = "relative axis event at {:f}, {}" return msg.format(self.event.timestamp(), REL[self.event.code]) - def __repr__(s): - return '{}({!r})'.format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class AbsEvent: - '''An absolute axis event (e.g the coordinates of a tap on a touchscreen).''' + """An absolute axis event (e.g the coordinates of a tap on a touchscreen).""" - __slots__ = 'event' + __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): - msg = 'absolute axis event at {:f}, {}' + msg = "absolute axis event at {:f}, {}" return msg.format(self.event.timestamp(), ABS[self.event.code]) - def __repr__(s): - return '{}({!r})'.format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) class SynEvent: - ''' + """ A synchronization event. Used as markers to separate events. Events may be separated in time or in space, such as with the multitouch protocol. - ''' + """ - __slots__ = 'event' + __slots__ = "event" - def __init__(self, event): + def __init__(self, event: InputEvent): #: Reference to an :class:`InputEvent` instance. - self.event = event + self.event: InputEvent = event def __str__(self): - msg = 'synchronization event at {:f}, {}' + msg = "synchronization event at {:f}, {}" return msg.format(self.event.timestamp(), SYN[self.event.code]) - def __repr__(s): - return '{}({!r})'.format(s.__class__.__name__, s.event) + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.event) #: A mapping of event types to :class:`InputEvent` sub-classes. Used @@ -191,5 +189,4 @@ def __repr__(s): } -__all__ = ('InputEvent', 'KeyEvent', 'RelEvent', 'SynEvent', - 'AbsEvent', 'event_factory') +__all__ = ("InputEvent", "KeyEvent", "RelEvent", "SynEvent", "AbsEvent", "event_factory") diff --git a/evdev/evtest.py b/src/evdev/evtest.py similarity index 62% rename from evdev/evtest.py rename to src/evdev/evtest.py index da6c683..6ea3bb5 100644 --- a/evdev/evtest.py +++ b/src/evdev/evtest.py @@ -1,6 +1,4 @@ -# encoding: utf-8 - -''' +""" Usage: evtest [options] [, ...] Input device enumerator and event monitor. @@ -16,31 +14,23 @@ Examples: evtest /dev/input/event0 /dev/input/event1 -''' - - -from __future__ import print_function +""" +import atexit +import optparse import re -import sys import select -import atexit +import sys import termios -import optparse - -try: - input = raw_input -except NameError: - pass -from evdev import ecodes, list_devices, AbsInfo, InputDevice +from . import AbsInfo, InputDevice, ecodes, list_devices def parseopt(): parser = optparse.OptionParser(add_help_option=False) - parser.add_option('-h', '--help', action='store_true') - parser.add_option('-g', '--grab', action='store_true') - parser.add_option('-c', '--capabilities', action='store_true') + parser.add_option("-h", "--help", action="store_true") + parser.add_option("-g", "--grab", action="store_true") + parser.add_option("-c", "--capabilities", action="store_true") return parser.parse_args() @@ -69,7 +59,7 @@ def main(): toggle_tty_echo(sys.stdin, enable=False) atexit.register(toggle_tty_echo, sys.stdin, enable=True) - print('Listening for events (press ctrl-c to exit) ...') + print("Listening for events (press ctrl-c to exit) ...") fd_to_device = {dev.fd: dev for dev in devices} while True: r, w, e = select.select(fd_to_device, [], []) @@ -79,31 +69,31 @@ def main(): print_event(event) -def select_devices(device_dir='/dev/input'): - ''' +def select_devices(device_dir="/dev/input"): + """ Select one or more devices from a list of accessible input devices. - ''' + """ def devicenum(device_path): - digits = re.findall(r'\d+$', device_path) + digits = re.findall(r"\d+$", device_path) return [int(i) for i in digits] devices = sorted(list_devices(device_dir), key=devicenum) devices = [InputDevice(path) for path in devices] if not devices: - msg = 'error: no input devices found (do you have rw permission on %s/*?)' + msg = "error: no input devices found (do you have rw permission on %s/*?)" print(msg % device_dir, file=sys.stderr) sys.exit(1) - dev_format = '{0:<3} {1.path:<20} {1.name:<35} {1.phys:<35} {1.uniq:<4}' + dev_format = "{0:<3} {1.path:<20} {1.name:<35} {1.phys:<35} {1.uniq:<4}" dev_lines = [dev_format.format(num, dev) for num, dev in enumerate(devices)] - print('ID {:<20} {:<35} {:<35} {}'.format('Device', 'Name', 'Phys', 'Uniq')) - print('-' * len(max(dev_lines, key=len))) - print('\n'.join(dev_lines)) + print("ID {:<20} {:<35} {:<35} {}".format("Device", "Name", "Phys", "Uniq")) + print("-" * len(max(dev_lines, key=len))) + print("\n".join(dev_lines)) print() - choices = input('Select devices [0-%s]: ' % (len(dev_lines) - 1)) + choices = input("Select devices [0-%s]: " % (len(dev_lines) - 1)) try: choices = choices.split() @@ -112,7 +102,7 @@ def devicenum(device_path): choices = None if not choices: - msg = 'error: invalid input - please enter one or more numbers separated by spaces' + msg = "error: invalid input - please enter one or more numbers separated by spaces" print(msg, file=sys.stderr) sys.exit(1) @@ -123,52 +113,54 @@ def print_capabilities(device): capabilities = device.capabilities(verbose=True) input_props = device.input_props(verbose=True) - print('Device name: {.name}'.format(device)) - print('Device info: {.info}'.format(device)) - print('Repeat settings: {}\n'.format(device.repeat)) + print("Device name: {.name}".format(device)) + print("Device info: {.info}".format(device)) + print("Repeat settings: {}\n".format(device.repeat)) - if ('EV_LED', ecodes.EV_LED) in capabilities: - leds = ','.join(i[0] for i in device.leds(True)) - print('Active LEDs: %s' % leds) + if ("EV_LED", ecodes.EV_LED) in capabilities: + leds = ",".join(i[0] for i in device.leds(True)) + print("Active LEDs: %s" % leds) - active_keys = ','.join(k[0] for k in device.active_keys(True)) - print('Active keys: %s\n' % active_keys) + active_keys = ",".join(k[0] for k in device.active_keys(True)) + print("Active keys: %s\n" % active_keys) if input_props: - print('Input properties:') + print("Input properties:") for type, code in input_props: - print(' %s %s' % (type, code)) + print(" %s %s" % (type, code)) print() - print('Device capabilities:') + print("Device capabilities:") for type, codes in capabilities.items(): - print(' Type {} {}:'.format(*type)) + print(" Type {} {}:".format(*type)) for code in codes: # code <- ('BTN_RIGHT', 273) or (['BTN_LEFT', 'BTN_MOUSE'], 272) if isinstance(code[1], AbsInfo): - print(' Code {:<4} {}:'.format(*code[0])) - print(' {}'.format(code[1])) + print(" Code {:<4} {}:".format(*code[0])) + print(" {}".format(code[1])) else: # Multiple names may resolve to one value. - s = ', '.join(code[0]) if isinstance(code[0], list) else code[0] - print(' Code {:<4} {}'.format(s, code[1])) - print('') + s = ", ".join(code[0]) if isinstance(code[0], list) else code[0] + print(" Code {:<4} {}".format(s, code[1])) + print("") def print_event(e): if e.type == ecodes.EV_SYN: if e.code == ecodes.SYN_MT_REPORT: - msg = 'time {:<16} +++++++++ {} ++++++++' + msg = "time {:<17} +++++++++++++ {} +++++++++++++" + elif e.code == ecodes.SYN_DROPPED: + msg = "time {:<17} !!!!!!!!!!!!! {} !!!!!!!!!!!!!" else: - msg = 'time {:<16} --------- {} --------' + msg = "time {:<17} ------------- {} -------------" print(msg.format(e.timestamp(), ecodes.SYN[e.code])) else: if e.type in ecodes.bytype: codename = ecodes.bytype[e.type][e.code] else: - codename = '?' + codename = "?" - evfmt = 'time {:<16} type {} ({}), code {:<4} ({}), value {}' + evfmt = "time {:<17} type {} ({}), code {:<4} ({}), value {}" print(evfmt.format(e.timestamp(), e.type, ecodes.EV[e.type], e.code, codename, e.value)) @@ -181,7 +173,7 @@ def toggle_tty_echo(fh, enable=True): termios.tcsetattr(fh.fileno(), termios.TCSANOW, flags) -if __name__ == '__main__': +if __name__ == "__main__": try: ret = main() except (KeyboardInterrupt, EOFError): diff --git a/evdev/ff.py b/src/evdev/ff.py similarity index 66% rename from evdev/ff.py rename to src/evdev/ff.py index 0008906..260c362 100644 --- a/evdev/ff.py +++ b/src/evdev/ff.py @@ -1,43 +1,42 @@ -# encoding: utf-8 - import ctypes -from evdev import ecodes +from . import ecodes -_u8 = ctypes.c_uint8 +_u8 = ctypes.c_uint8 _u16 = ctypes.c_uint16 _u32 = ctypes.c_uint32 _s16 = ctypes.c_int16 _s32 = ctypes.c_int32 + class Replay(ctypes.Structure): - ''' + """ Defines scheduling of the force-feedback effect @length: duration of the effect @delay: delay before effect should start playing - ''' + """ _fields_ = [ - ('length', _u16), - ('delay', _u16), + ("length", _u16), + ("delay", _u16), ] class Trigger(ctypes.Structure): - ''' + """ Defines what triggers the force-feedback effect @button: number of the button triggering the effect @interval: controls how soon the effect can be re-triggered - ''' + """ _fields_ = [ - ('button', _u16), - ('interval', _u16), + ("button", _u16), + ("interval", _u16), ] class Envelope(ctypes.Structure): - ''' + """ Generic force-feedback effect envelope @attack_length: duration of the attack (ms) @attack_level: level at the beginning of the attack @@ -48,46 +47,46 @@ class Envelope(ctypes.Structure): envelope force-feedback core will convert to positive/negative value based on polarity of the default level of the effect. Valid range for the attack and fade levels is 0x0000 - 0x7fff - ''' + """ _fields_ = [ - ('attack_length', _u16), - ('attack_level', _u16), - ('fade_length', _u16), - ('fade_level', _u16), + ("attack_length", _u16), + ("attack_level", _u16), + ("fade_length", _u16), + ("fade_level", _u16), ] class Constant(ctypes.Structure): - ''' + """ Defines parameters of a constant force-feedback effect @level: strength of the effect; may be negative @envelope: envelope data - ''' + """ _fields_ = [ - ('level', _s16), - ('ff_envelope', Envelope), + ("level", _s16), + ("ff_envelope", Envelope), ] class Ramp(ctypes.Structure): - ''' + """ Defines parameters of a ramp force-feedback effect @start_level: beginning strength of the effect; may be negative @end_level: final strength of the effect; may be negative @envelope: envelope data - ''' + """ _fields_ = [ - ('start_level', _s16), - ('end_level', _s16), - ('ff_envelope', Envelope), + ("start_level", _s16), + ("end_level", _s16), + ("ff_envelope", Envelope), ] class Condition(ctypes.Structure): - ''' + """ Defines a spring or friction force-feedback effect @right_saturation: maximum level when joystick moved all way to the right @left_saturation: same for the left side @@ -95,20 +94,20 @@ class Condition(ctypes.Structure): @left_coeff: same for the left side @deadband: size of the dead zone, where no force is produced @center: position of the dead zone - ''' + """ _fields_ = [ - ('right_saturation', _u16), - ('left_saturation', _u16), - ('right_coeff', _s16), - ('left_coeff', _s16), - ('deadband', _u16), - ('center', _s16), + ("right_saturation", _u16), + ("left_saturation", _u16), + ("right_coeff", _s16), + ("left_coeff", _s16), + ("deadband", _u16), + ("center", _s16), ] class Periodic(ctypes.Structure): - ''' + """ Defines parameters of a periodic force-feedback effect @waveform: kind of the effect (wave) @period: period of the wave (ms) @@ -118,71 +117,74 @@ class Periodic(ctypes.Structure): @envelope: envelope data @custom_len: number of samples (FF_CUSTOM only) @custom_data: buffer of samples (FF_CUSTOM only) - ''' + """ _fields_ = [ - ('waveform', _u16), - ('period', _u16), - ('magnitude', _s16), - ('offset', _s16), - ('phase', _u16), - ('envelope', Envelope), - ('custom_len', _u32), - ('custom_data', ctypes.POINTER(_s16)), + ("waveform", _u16), + ("period", _u16), + ("magnitude", _s16), + ("offset", _s16), + ("phase", _u16), + ("envelope", Envelope), + ("custom_len", _u32), + ("custom_data", ctypes.POINTER(_s16)), ] class Rumble(ctypes.Structure): - ''' + """ Defines parameters of a periodic force-feedback effect @strong_magnitude: magnitude of the heavy motor @weak_magnitude: magnitude of the light one Some rumble pads have two motors of different weight. Strong_magnitude represents the magnitude of the vibration generated by the heavy one. - ''' + """ _fields_ = [ - ('strong_magnitude', _u16), - ('weak_magnitude', _u16), + ("strong_magnitude", _u16), + ("weak_magnitude", _u16), ] class EffectType(ctypes.Union): _fields_ = [ - ('ff_constant_effect', Constant), - ('ff_ramp_effect', Ramp), - ('ff_periodic_effect', Periodic), - ('ff_condition_effect', Condition * 2), # one for each axis - ('ff_rumble_effect', Rumble), + ("ff_constant_effect", Constant), + ("ff_ramp_effect", Ramp), + ("ff_periodic_effect", Periodic), + ("ff_condition_effect", Condition * 2), # one for each axis + ("ff_rumble_effect", Rumble), ] class Effect(ctypes.Structure): _fields_ = [ - ('type', _u16), - ('id', _s16), - ('direction', _u16), - ('ff_trigger', Trigger), - ('ff_replay', Replay), - ('u', EffectType) + ("type", _u16), + ("id", _s16), + ("direction", _u16), + ("ff_trigger", Trigger), + ("ff_replay", Replay), + ("u", EffectType), ] + class UInputUpload(ctypes.Structure): _fields_ = [ - ('request_id', _u32), - ('retval', _s32), - ('effect', Effect), - ('old', Effect), + ("request_id", _u32), + ("retval", _s32), + ("effect", Effect), + ("old", Effect), ] + class UInputErase(ctypes.Structure): _fields_ = [ - ('request_id', _u32), - ('retval', _s32), - ('effect_id', _u32), + ("request_id", _u32), + ("retval", _s32), + ("effect_id", _u32), ] + # ff_types = { # ecodes.FF_CONSTANT, # ecodes.FF_PERIODIC, diff --git a/src/evdev/genecodes_c.py b/src/evdev/genecodes_c.py new file mode 100644 index 0000000..15a6693 --- /dev/null +++ b/src/evdev/genecodes_c.py @@ -0,0 +1,147 @@ +""" +Generate a Python extension module with the constants defined in linux/input.h. +""" + +import getopt +import os +import re +import sys + +# ----------------------------------------------------------------------------- +# The default header file locations to try. +headers = [ + "/usr/include/linux/input.h", + "/usr/include/linux/input-event-codes.h", + "/usr/include/linux/uinput.h", +] + +opts, args = getopt.getopt(sys.argv[1:], "", ["ecodes", "stubs", "reproducible"]) +if not opts: + print("usage: genecodes.py [--ecodes|--stubs] [--reproducible] ") + exit(2) + +if args: + headers = args + +reproducible = ("--reproducible", "") in opts + + +# ----------------------------------------------------------------------------- +macro_regex = r"#define\s+((?:KEY|ABS|REL|SW|MSC|LED|BTN|REP|SND|ID|EV|BUS|SYN|FF|UI_FF|INPUT_PROP)_\w+)" +macro_regex = re.compile(macro_regex) + +if reproducible: + uname = "hidden for reproducibility" +else: + # Uname without hostname. + uname = list(os.uname()) + uname = " ".join((uname[0], *uname[2:])) + + +# ----------------------------------------------------------------------------- +template_ecodes = r""" +#include +#ifdef __FreeBSD__ +#include +#include +#else +#include +#include +#endif + +/* Automatically generated by evdev.genecodes */ +/* Generated on %s */ +/* Generated from %s */ + +#define MODULE_NAME "_ecodes" +#define MODULE_HELP "linux/input.h macros" + +static PyMethodDef MethodTable[] = { + { NULL, NULL, 0, NULL} +}; + +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + MODULE_NAME, + MODULE_HELP, + -1, /* m_size */ + MethodTable, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; + +PyMODINIT_FUNC +PyInit__ecodes(void) +{ + PyObject* m = PyModule_Create(&moduledef); + if (m == NULL) return NULL; + +%s + + return m; +} +""" + + +template_stubs = r""" +# Automatically generated by evdev.genecodes +# Generated on %s +# Generated from %s + +# pylint: skip-file + +ecodes: dict[str, int] +keys: dict[int, str|list[str]] +bytype: dict[int, dict[int, str|list[str]]] + +KEY: dict[int, str|list[str]] +ABS: dict[int, str|list[str]] +REL: dict[int, str|list[str]] +SW: dict[int, str|list[str]] +MSC: dict[int, str|list[str]] +LED: dict[int, str|list[str]] +BTN: dict[int, str|list[str]] +REP: dict[int, str|list[str]] +SND: dict[int, str|list[str]] +ID: dict[int, str|list[str]] +EV: dict[int, str|list[str]] +BUS: dict[int, str|list[str]] +SYN: dict[int, str|list[str]] +FF_STATUS: dict[int, str|list[str]] +FF_INPUT_PROP: dict[int, str|list[str]] + +%s +""" + + +def parse_headers(headers=headers): + for header in headers: + try: + fh = open(header) + except (IOError, OSError): + continue + + for line in fh: + macro = macro_regex.search(line) + if macro: + yield macro.group(1) + + +all_macros = list(parse_headers()) +if not all_macros: + print("no input macros found in: %s" % " ".join(headers), file=sys.stderr) + sys.exit(1) + +# pylint: disable=possibly-used-before-assignment, used-before-assignment +if ("--ecodes", "") in opts: + body = (" PyModule_AddIntMacro(m, %s);" % macro for macro in all_macros) + template = template_ecodes +elif ("--stubs", "") in opts: + body = ("%s: int" % macro for macro in all_macros) + template = template_stubs + +body = os.linesep.join(body) +text = template % (uname, headers if not reproducible else ["hidden for reproducibility"], body) +print(text.strip()) diff --git a/src/evdev/genecodes_py.py b/src/evdev/genecodes_py.py new file mode 100644 index 0000000..f00020c --- /dev/null +++ b/src/evdev/genecodes_py.py @@ -0,0 +1,54 @@ +import sys +from unittest import mock +from pprint import PrettyPrinter + +sys.modules["evdev.ecodes"] = mock.Mock() +from evdev import ecodes_runtime as ecodes + +pprint = PrettyPrinter(indent=2, sort_dicts=True, width=120).pprint + + +print("# Automatically generated by evdev.genecodes_py") +print() +print('"""') +print(ecodes.__doc__.strip()) +print('"""') + +print() +print("from typing import Final, Dict, Tuple, Union") +print() + +for name, value in ecodes.ecodes.items(): + print(f"{name}: Final[int] = {value}") +print() + +entries = [ + ("ecodes", "Dict[str, int]", "#: Mapping of names to values."), + ("bytype", "Dict[int, Dict[int, Union[str, Tuple[str]]]]", "#: Mapping of event types to other value/name mappings."), + ("keys", "Dict[int, Union[str, Tuple[str]]]", "#: Keys are a combination of all BTN and KEY codes."), + ("KEY", "Dict[int, Union[str, Tuple[str]]]", None), + ("ABS", "Dict[int, Union[str, Tuple[str]]]", None), + ("REL", "Dict[int, Union[str, Tuple[str]]]", None), + ("SW", "Dict[int, Union[str, Tuple[str]]]", None), + ("MSC", "Dict[int, Union[str, Tuple[str]]]", None), + ("LED", "Dict[int, Union[str, Tuple[str]]]", None), + ("BTN", "Dict[int, Union[str, Tuple[str]]]", None), + ("REP", "Dict[int, Union[str, Tuple[str]]]", None), + ("SND", "Dict[int, Union[str, Tuple[str]]]", None), + ("ID", "Dict[int, Union[str, Tuple[str]]]", None), + ("EV", "Dict[int, Union[str, Tuple[str]]]", None), + ("BUS", "Dict[int, Union[str, Tuple[str]]]", None), + ("SYN", "Dict[int, Union[str, Tuple[str]]]", None), + ("FF", "Dict[int, Union[str, Tuple[str]]]", None), + ("UI_FF", "Dict[int, Union[str, Tuple[str]]]", None), + ("FF_STATUS", "Dict[int, Union[str, Tuple[str]]]", None), + ("INPUT_PROP", "Dict[int, Union[str, Tuple[str]]]", None) +] + +for key, annotation, doc in entries: + if doc: + print(doc) + + print(f"{key}: {annotation} = ", end="") + pprint(getattr(ecodes, key)) + print() \ No newline at end of file diff --git a/evdev/input.c b/src/evdev/input.c similarity index 85% rename from evdev/input.c rename to src/evdev/input.c index b1497a4..894db22 100644 --- a/evdev/input.c +++ b/src/evdev/input.c @@ -46,12 +46,10 @@ int test_bit(const char* bitmask, int bit) { static PyObject * device_read(PyObject *self, PyObject *args) { - int fd; struct input_event event; // get device file descriptor (O_RDONLY|O_NONBLOCK) - if (PyArg_ParseTuple(args, "i", &fd) < 0) - return NULL; + int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0)); int n = read(fd, &event, sizeof(event)); @@ -61,19 +59,16 @@ device_read(PyObject *self, PyObject *args) return Py_None; } - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } - PyObject* sec = PyLong_FromLong(event.input_event_sec); - PyObject* usec = PyLong_FromLong(event.input_event_usec); - PyObject* val = PyLong_FromLong(event.value); - PyObject* py_input_event = NULL; - - py_input_event = Py_BuildValue("OOhhO", sec, usec, event.type, event.code, val); - Py_DECREF(sec); - Py_DECREF(usec); - Py_DECREF(val); + PyObject *py_input_event = PyTuple_New(5); + PyTuple_SET_ITEM(py_input_event, 0, PyLong_FromLong(event.input_event_sec)); + PyTuple_SET_ITEM(py_input_event, 1, PyLong_FromLong(event.input_event_usec)); + PyTuple_SET_ITEM(py_input_event, 2, PyLong_FromLong(event.type)); + PyTuple_SET_ITEM(py_input_event, 3, PyLong_FromLong(event.code)); + PyTuple_SET_ITEM(py_input_event, 4, PyLong_FromLong(event.value)); return py_input_event; } @@ -83,17 +78,8 @@ device_read(PyObject *self, PyObject *args) static PyObject * device_read_many(PyObject *self, PyObject *args) { - int fd; - // get device file descriptor (O_RDONLY|O_NONBLOCK) - int ret = PyArg_ParseTuple(args, "i", &fd); - if (!ret) return NULL; - - PyObject* event_list = PyList_New(0); - PyObject* py_input_event = NULL; - PyObject* sec = NULL; - PyObject* usec = NULL; - PyObject* val = NULL; + int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0)); struct input_event event[64]; @@ -101,27 +87,25 @@ device_read_many(PyObject *self, PyObject *args) ssize_t nread = read(fd, event, event_size*64); if (nread < 0) { - PyErr_SetFromErrno(PyExc_IOError); - Py_DECREF(event_list); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } - // Construct a list of event tuples, which we'll make sense of in Python - for (unsigned i = 0 ; i < nread/event_size ; i++) { - sec = PyLong_FromLong(event[i].input_event_sec); - usec = PyLong_FromLong(event[i].input_event_usec); - val = PyLong_FromLong(event[i].value); - - py_input_event = Py_BuildValue("OOhhO", sec, usec, event[i].type, event[i].code, val); - PyList_Append(event_list, py_input_event); - - Py_DECREF(py_input_event); - Py_DECREF(sec); - Py_DECREF(usec); - Py_DECREF(val); + // Construct a tuple of event tuples. Each tuple is the arguments to InputEvent. + size_t num_events = nread / event_size; + + PyObject* events = PyTuple_New(num_events); + for (size_t i = 0 ; i < num_events; i++) { + PyObject *py_input_event = PyTuple_New(5); + PyTuple_SET_ITEM(py_input_event, 0, PyLong_FromLong(event[i].input_event_sec)); + PyTuple_SET_ITEM(py_input_event, 1, PyLong_FromLong(event[i].input_event_usec)); + PyTuple_SET_ITEM(py_input_event, 2, PyLong_FromLong(event[i].type)); + PyTuple_SET_ITEM(py_input_event, 3, PyLong_FromLong(event[i].code)); + PyTuple_SET_ITEM(py_input_event, 4, PyLong_FromLong(event[i].value)); + PyTuple_SET_ITEM(events, i, py_input_event); } - return event_list; + return events; } @@ -208,7 +192,12 @@ ioctl_capabilities(PyObject *self, PyObject *args) return capabilities; on_err: - PyErr_SetFromErrno(PyExc_IOError); + Py_XDECREF(capabilities); + Py_XDECREF(eventcodes); + Py_XDECREF(capability); + Py_XDECREF(py_absinfo); + Py_XDECREF(absitem); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -243,7 +232,7 @@ ioctl_devinfo(PyObject *self, PyObject *args) name, phys, uniq); on_err: - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -261,7 +250,7 @@ ioctl_EVIOCGABS(PyObject *self, PyObject *args) memset(&absinfo, 0, sizeof(absinfo)); ret = ioctl(fd, EVIOCGABS(ev_code), &absinfo); if (ret == -1) { - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -296,7 +285,7 @@ ioctl_EVIOCSABS(PyObject *self, PyObject *args) ret = ioctl(fd, EVIOCSABS(ev_code), &absinfo); if (ret == -1) { - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -309,7 +298,7 @@ static PyObject * ioctl_EVIOCGREP(PyObject *self, PyObject *args) { int fd, ret; - unsigned int rep[2] = {0}; + unsigned int rep[REP_CNT] = {0}; ret = PyArg_ParseTuple(args, "i", &fd); if (!ret) return NULL; @@ -317,7 +306,7 @@ ioctl_EVIOCGREP(PyObject *self, PyObject *args) if (ret == -1) return NULL; - return Py_BuildValue("(ii)", rep[0], rep[1]); + return Py_BuildValue("(ii)", rep[REP_DELAY], rep[REP_PERIOD]); } @@ -325,7 +314,7 @@ static PyObject * ioctl_EVIOCSREP(PyObject *self, PyObject *args) { int fd, ret; - unsigned int rep[2] = {0}; + unsigned int rep[REP_CNT] = {0}; ret = PyArg_ParseTuple(args, "iii", &fd, &rep[0], &rep[1]); if (!ret) return NULL; @@ -362,7 +351,7 @@ ioctl_EVIOCGRAB(PyObject *self, PyObject *args) ret = ioctl(fd, EVIOCGRAB, (intptr_t)flag); if (ret != 0) { - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -416,7 +405,9 @@ ioctl_EVIOCG_bits(PyObject *self, PyObject *args) PyObject* res = PyList_New(0); for (int i=0; i<=max; i++) { if (test_bit(bytes, i)) { - PyList_Append(res, Py_BuildValue("i", i)); + PyObject *val = PyLong_FromLong(i); + PyList_Append(res, val); + Py_DECREF(val); } } @@ -485,7 +476,7 @@ upload_effect(PyObject *self, PyObject *args) ret = ioctl(fd, EVIOCSFF, &effect); if (ret != 0) { - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -504,7 +495,7 @@ erase_effect(PyObject *self, PyObject *args) long ff_id = PyLong_AsLong(ff_id_obj); ret = ioctl(fd, EVIOCRMFF, ff_id); if (ret != 0) { - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -531,7 +522,9 @@ ioctl_EVIOCGPROP(PyObject *self, PyObject *args) PyObject* res = PyList_New(0); for (int i=0; i= 3 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, + "_input", + "Python bindings to certain linux input subsystem functions", -1, /* m_size */ MethodTable, /* m_methods */ NULL, /* m_reload */ @@ -590,19 +578,3 @@ PyInit__input(void) { return moduleinit(); } - -#else -static PyObject * -moduleinit(void) -{ - PyObject* m = Py_InitModule3(MODULE_NAME, MethodTable, MODULE_HELP); - if (m == NULL) return NULL; - return m; -} - -PyMODINIT_FUNC -init_input(void) -{ - moduleinit(); -} -#endif diff --git a/src/evdev/py.typed b/src/evdev/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/evdev/uinput.c b/src/evdev/uinput.c similarity index 86% rename from evdev/uinput.c rename to src/evdev/uinput.c index 20318ce..8d2c096 100644 --- a/evdev/uinput.c +++ b/src/evdev/uinput.c @@ -49,7 +49,7 @@ uinput_open(PyObject *self, PyObject *args) int fd = open(devnode, O_RDWR | O_NONBLOCK); if (fd < 0) { - PyErr_SetString(PyExc_IOError, "could not open uinput device in write mode"); + PyErr_SetString(PyExc_OSError, "could not open uinput device in write mode"); return NULL; } @@ -73,7 +73,7 @@ uinput_set_phys(PyObject *self, PyObject *args) on_err: _uinput_close(fd); - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -93,10 +93,30 @@ uinput_set_prop(PyObject *self, PyObject *args) on_err: _uinput_close(fd); - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } +static PyObject * +uinput_get_sysname(PyObject *self, PyObject *args) +{ + int fd; + char sysname[64]; + + int ret = PyArg_ParseTuple(args, "i", &fd); + if (!ret) return NULL; + + #ifdef UI_GET_SYSNAME + if (ioctl(fd, UI_GET_SYSNAME(sizeof(sysname)), &sysname) < 0) + goto on_err; + + return Py_BuildValue("s", &sysname); + #endif + + on_err: + PyErr_SetFromErrno(PyExc_OSError); + return NULL; +} // Different kernel versions have different device setup methods. You can read // more about it here: @@ -108,14 +128,15 @@ static PyObject * uinput_setup(PyObject *self, PyObject *args) { int fd, len, i; uint16_t vendor, product, version, bustype; + uint32_t max_effects; PyObject *absinfo = NULL, *item = NULL; struct uinput_abs_setup abs_setup; const char* name; - int ret = PyArg_ParseTuple(args, "isHHHHO", &fd, &name, &vendor, - &product, &version, &bustype, &absinfo); + int ret = PyArg_ParseTuple(args, "isHHHHOI", &fd, &name, &vendor, + &product, &version, &bustype, &absinfo, &max_effects); if (!ret) return NULL; // Setup absinfo: @@ -147,7 +168,7 @@ uinput_setup(PyObject *self, PyObject *args) { usetup.id.product = product; usetup.id.version = version; usetup.id.bustype = bustype; - usetup.ff_effects_max = FF_MAX_EFFECTS; + usetup.ff_effects_max = max_effects; if(ioctl(fd, UI_DEV_SETUP, &usetup) < 0) goto on_err; @@ -156,7 +177,7 @@ uinput_setup(PyObject *self, PyObject *args) { on_err: _uinput_close(fd); - PyErr_SetFromErrno(PyExc_IOError); + PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -166,14 +187,15 @@ static PyObject * uinput_setup(PyObject *self, PyObject *args) { int fd, len, i, abscode; uint16_t vendor, product, version, bustype; + uint32_t max_effects; PyObject *absinfo = NULL, *item = NULL; struct uinput_user_dev uidev; const char* name; - int ret = PyArg_ParseTuple(args, "isHHHHO", &fd, &name, &vendor, - &product, &version, &bustype, &absinfo); + int ret = PyArg_ParseTuple(args, "isHHHHOI", &fd, &name, &vendor, + &product, &version, &bustype, &absinfo, &max_effects); if (!ret) return NULL; memset(&uidev, 0, sizeof(uidev)); @@ -182,7 +204,7 @@ uinput_setup(PyObject *self, PyObject *args) { uidev.id.product = product; uidev.id.version = version; uidev.id.bustype = bustype; - uidev.ff_effects_max = FF_MAX_EFFECTS; + uidev.ff_effects_max = max_effects; len = PyList_Size(absinfo); for (i=0; i= 3 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, - MODULE_NAME, - MODULE_HELP, + "_uinput", + "Python bindings for parts of linux/uinput.c", -1, /* m_size */ MethodTable, /* m_methods */ NULL, /* m_reload */ @@ -393,21 +415,3 @@ PyInit__uinput(void) { return moduleinit(); } - -#else -static PyObject * -moduleinit(void) -{ - PyObject* m = Py_InitModule3(MODULE_NAME, MethodTable, MODULE_HELP); - if (m == NULL) return NULL; - - PyModule_AddIntConstant(m, "maxnamelen", UINPUT_MAX_NAME_SIZE); - return m; -} - -PyMODINIT_FUNC -init_uinput(void) -{ - moduleinit(); -} -#endif diff --git a/src/evdev/uinput.py b/src/evdev/uinput.py new file mode 100644 index 0000000..2c69c2b --- /dev/null +++ b/src/evdev/uinput.py @@ -0,0 +1,375 @@ +import ctypes +import os +import platform +import re +import stat +import time +from collections import defaultdict +from typing import Union, Tuple, Dict, Sequence, Optional + +from . import _uinput, ecodes, ff, util +from .device import InputDevice, AbsInfo +from .events import InputEvent + +try: + from evdev.eventio_async import EventIO +except ImportError: + from evdev.eventio import EventIO + + +class UInputError(Exception): + pass + + +class UInput(EventIO): + """ + A userland input device and that can inject input events into the + linux input subsystem. + """ + + __slots__ = ( + "name", + "vendor", + "product", + "version", + "bustype", + "events", + "devnode", + "fd", + "device", + ) + + @classmethod + def from_device( + cls, + *devices: Union[InputDevice, Union[str, bytes, os.PathLike]], + filtered_types: Tuple[int] = (ecodes.EV_SYN, ecodes.EV_FF), + **kwargs, + ): + """ + Create an UInput device with the capabilities of one or more input + devices. + + Arguments + --------- + devices : InputDevice|str + Varargs of InputDevice instances or paths to input devices. + + filtered_types : Tuple[event type codes] + Event types to exclude from the capabilities of the uinput device. + + **kwargs + Keyword arguments to UInput constructor (i.e. name, vendor etc.). + """ + + device_instances = [] + for dev in devices: + if not isinstance(dev, InputDevice): + dev = InputDevice(str(dev)) + device_instances.append(dev) + + all_capabilities = defaultdict(set) + + if "max_effects" not in kwargs: + kwargs["max_effects"] = min([dev.ff_effects_count for dev in device_instances]) + + # Merge the capabilities of all devices into one dictionary. + for dev in device_instances: + for ev_type, ev_codes in dev.capabilities().items(): + all_capabilities[ev_type].update(ev_codes) + + for evtype in filtered_types: + if evtype in all_capabilities: + del all_capabilities[evtype] + + return cls(events=all_capabilities, **kwargs) + + def __init__( + self, + events: Optional[Dict[int, Sequence[int]]] = None, + name: str = "py-evdev-uinput", + vendor: int = 0x1, + product: int = 0x1, + version: int = 0x1, + bustype: int = 0x3, + devnode: str = "/dev/uinput", + phys: str = "py-evdev-uinput", + input_props=None, + # CentOS 7 has sufficiently old headers that FF_MAX_EFFECTS is not defined there, + # which causes the whole module to fail loading. Fallback on a hardcoded value of + # FF_MAX_EFFECTS if it is not defined in the ecodes. + max_effects=ecodes.ecodes.get("FF_MAX_EFFECTS", 96), + ): + """ + Arguments + --------- + events : dict + Dictionary of event types mapping to lists of event codes. The + event types and codes that the uinput device will be able to + inject - defaults to all key codes. + + name + The name of the input device. + + vendor + Vendor identifier. + + product + Product identifier. + + version + Version identifier. + + bustype + Bustype identifier. + + phys + Physical path. + + input_props + Input properties and quirks. + + max_effects + Maximum simultaneous force-feedback effects. + + Note + ---- + If you do not specify any events, the uinput device will be able + to inject only ``KEY_*`` and ``BTN_*`` event codes. + """ + + self.name: str = name #: Uinput device name. + self.vendor: int = vendor #: Device vendor identifier. + self.product: int = product #: Device product identifier. + self.version: int = version #: Device version identifier. + self.bustype: int = bustype #: Device bustype - e.g. ``BUS_USB``. + self.phys: str = phys #: Uinput device physical path. + self.devnode: str = devnode #: Uinput device node - e.g. ``/dev/uinput/``. + + if not events: + events = {ecodes.EV_KEY: ecodes.keys.keys()} + + self._verify() + + #: Write-only, non-blocking file descriptor to the uinput device node. + self.fd = _uinput.open(devnode) + + # Prepare the list of events for passing to _uinput.enable and _uinput.setup. + absinfo, prepared_events = self._prepare_events(events) + + # Set phys name + _uinput.set_phys(self.fd, phys) + + # Set properties + input_props = input_props or [] + for prop in input_props: + _uinput.set_prop(self.fd, prop) + + for etype, code in prepared_events: + _uinput.enable(self.fd, etype, code) + + _uinput.setup(self.fd, name, vendor, product, version, bustype, absinfo, max_effects) + + # Create the uinput device. + _uinput.create(self.fd) + + self.dll = ctypes.CDLL(_uinput.__file__) + self.dll._uinput_begin_upload.restype = ctypes.c_int + self.dll._uinput_end_upload.restype = ctypes.c_int + + #: An :class:`InputDevice ` instance + #: for the fake input device. ``None`` if the device cannot be + #: opened for reading and writing. + self.device: InputDevice = self._find_device(self.fd) + + def _prepare_events(self, events): + """Prepare events for passing to _uinput.enable and _uinput.setup""" + absinfo, prepared_events = [], [] + for etype, codes in events.items(): + for code in codes: + # Handle max, min, fuzz, flat. + if isinstance(code, (tuple, list, AbsInfo)): + # Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0). + f = [code[0]] + f.extend(code[1]) + # Ensure the tuple is always 6 ints long, since uinput.c:uinput_create + # does little in the way of checking the length. + f.extend([0] * (6 - len(code[1]))) + absinfo.append(f) + code = code[0] + prepared_events.append((etype, code)) + return absinfo, prepared_events + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + if hasattr(self, "fd"): + self.close() + + def __repr__(self): + # TODO: + v = (repr(getattr(self, i)) for i in ("name", "bustype", "vendor", "product", "version", "phys")) + return "{}({})".format(self.__class__.__name__, ", ".join(v)) + + def __str__(self): + msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\nevent types: {}' + + evtypes = [i[0] for i in self.capabilities(True).keys()] + msg = msg.format( + self.name, ecodes.BUS[self.bustype], self.vendor, self.product, self.version, self.phys, " ".join(evtypes) + ) + + return msg + + def close(self): + # Close the associated InputDevice, if it was previously opened. + if self.device is not None: + self.device.close() + + # Destroy the uinput device. + if self.fd > -1: + _uinput.close(self.fd) + self.fd = -1 + + def capabilities(self, verbose: bool = False, absinfo: bool = True): + """See :func:`capabilities `.""" + if self.device is None: + raise UInputError("input device not opened - cannot read capabilities") + + return self.device.capabilities(verbose, absinfo) + + def begin_upload(self, effect_id): + upload = ff.UInputUpload() + upload.effect_id = effect_id + + ret = self.dll._uinput_begin_upload(self.fd, ctypes.byref(upload)) + if ret: + raise UInputError("Failed to begin uinput upload: " + os.strerror(ret)) + + return upload + + def end_upload(self, upload): + ret = self.dll._uinput_end_upload(self.fd, ctypes.byref(upload)) + if ret: + raise UInputError("Failed to end uinput upload: " + os.strerror(ret)) + + def begin_erase(self, effect_id): + erase = ff.UInputErase() + erase.effect_id = effect_id + + ret = self.dll._uinput_begin_erase(self.fd, ctypes.byref(erase)) + if ret: + raise UInputError("Failed to begin uinput erase: " + os.strerror(ret)) + return erase + + def end_erase(self, erase): + ret = self.dll._uinput_end_erase(self.fd, ctypes.byref(erase)) + if ret: + raise UInputError("Failed to end uinput erase: " + os.strerror(ret)) + + def _verify(self): + """ + Verify that an uinput device exists and is readable and writable + by the current process. + """ + try: + m = os.stat(self.devnode)[stat.ST_MODE] + assert stat.S_ISCHR(m) + except (IndexError, OSError, AssertionError): + msg = '"{}" does not exist or is not a character device file - verify that the uinput module is loaded' + raise UInputError(msg.format(self.devnode)) + + if not os.access(self.devnode, os.W_OK): + msg = '"{}" cannot be opened for writing' + raise UInputError(msg.format(self.devnode)) + + if len(self.name) > _uinput.maxnamelen: + msg = "uinput device name must not be longer than {} characters" + raise UInputError(msg.format(_uinput.maxnamelen)) + + def _find_device(self, fd: int) -> InputDevice: + """ + Tries to find the device node. Will delegate this task to one of + several platform-specific functions. + """ + if platform.system() == "Linux": + try: + sysname = _uinput.get_sysname(fd) + return self._find_device_linux(sysname) + except OSError: + # UI_GET_SYSNAME returned an error code. We're likely dealing with + # an old kernel. Guess the device based on the filesystem. + pass + + # If we're not running or Linux or the above method fails for any reason, + # use the generic fallback method. + return self._find_device_fallback() + + def _find_device_linux(self, sysname: str) -> InputDevice: + """ + Tries to find the device node when running on Linux. + """ + + syspath = f"/sys/devices/virtual/input/{sysname}" + + # The sysfs entry for event devices should contain exactly one folder + # whose name matches the format "event[0-9]+". It is then assumed that + # the device node in /dev/input uses the same name. + regex = re.compile("event[0-9]+") + for entry in os.listdir(syspath): + if regex.fullmatch(entry): + device_path = f"/dev/input/{entry}" + break + else: # no break + raise FileNotFoundError() + + # It is possible that there is some delay before /dev/input/event* shows + # up on old systems that do not use devtmpfs, so if the device cannot be + # found, wait for a short amount and then try again once. + # + # Furthermore, even if devtmpfs is in use, it is possible that the device + # does show up immediately, but without the correct permissions that + # still need to be set by udev. Wait for up to two seconds for either the + # device to show up or the permissions to be set. + for attempt in range(19): + try: + return InputDevice(device_path) + except (FileNotFoundError, PermissionError): + time.sleep(0.1) + + # Last attempt. If this fails, whatever exception the last attempt raises + # shall be the exception that this function raises. + return InputDevice(device_path) + + def _find_device_fallback(self) -> Union[InputDevice, None]: + """ + Tries to find the device node when UI_GET_SYSNAME is not available or + we're running on a system sufficiently exotic that we do not know how + to interpret its return value. + """ + #:bug: the device node might not be immediately available + time.sleep(0.1) + + # There could also be another device with the same name already present, + # make sure to select the newest one. + # Strictly speaking, we cannot be certain that everything returned by list_devices() + # ends at event[0-9]+: it might return something like "/dev/input/events_all". Find + # the devices that have the expected structure and extract their device number. + path_number_pairs = [] + regex = re.compile("/dev/input/event([0-9]+)") + for path in util.list_devices("/dev/input/"): + regex_match = regex.fullmatch(path) + if not regex_match: + continue + device_number = int(regex_match[1]) + path_number_pairs.append((path, device_number)) + + # The modification date of the devnode is not reliable unfortunately, so we + # are sorting by the number in the name + path_number_pairs.sort(key=lambda pair: pair[1], reverse=True) + + for path, _ in path_number_pairs: + d = InputDevice(path) + if d.name == self.name: + return d diff --git a/evdev/util.py b/src/evdev/util.py similarity index 77% rename from evdev/util.py rename to src/evdev/util.py index e8009f7..db89a22 100644 --- a/evdev/util.py +++ b/src/evdev/util.py @@ -1,26 +1,23 @@ -# encoding: utf-8 - -import re +import collections +import glob import os +import re import stat -import glob -import collections - -from evdev import ecodes -from evdev.events import event_factory +from typing import Union, List +from . import ecodes +from .events import InputEvent, event_factory, KeyEvent, RelEvent, AbsEvent, SynEvent -def list_devices(input_device_dir='/dev/input'): - '''List readable character devices in ``input_device_dir``.''' - fns = glob.glob('{}/event*'.format(input_device_dir)) - fns = list(filter(is_device, fns)) +def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]: + """List readable character devices in ``input_device_dir``.""" - return fns + fns = glob.glob("{}/event*".format(input_device_dir)) + return list(filter(is_device, fns)) -def is_device(fn): - '''Check if ``fn`` is a readable and writable character device.''' +def is_device(fn: Union[str, bytes, os.PathLike]) -> bool: + """Check if ``fn`` is a readable and writable character device.""" if not os.path.exists(fn): return False @@ -35,14 +32,14 @@ def is_device(fn): return True -def categorize(event): - ''' +def categorize(event: InputEvent) -> Union[InputEvent, KeyEvent, RelEvent, AbsEvent, SynEvent]: + """ Categorize an event according to its type. The :data:`event_factory ` dictionary maps event types to sub-classes of :class:`InputEvent `. If the event cannot be categorized, it - is returned unmodified.''' + is returned unmodified.""" if event.type in event_factory: return event_factory[event.type](event) @@ -50,8 +47,8 @@ def categorize(event): return event -def resolve_ecodes_dict(typecodemap, unknown='?'): - ''' +def resolve_ecodes_dict(typecodemap, unknown="?"): + """ Resolve event codes and types to their verbose names. :param typecodemap: mapping of event types to lists of event codes. @@ -70,7 +67,7 @@ def resolve_ecodes_dict(typecodemap, unknown='?'): >>> resolve_ecodes_dict({ 3: [(0, AbsInfo(...))] }) { ('EV_ABS', 3L): [(('ABS_X', 0L), AbsInfo(...))] } - ''' + """ for etype, codes in typecodemap.items(): type_name = ecodes.EV[etype] @@ -79,21 +76,21 @@ def resolve_ecodes_dict(typecodemap, unknown='?'): if etype == ecodes.EV_KEY: ecode_dict = ecodes.keys else: - ecode_dict = getattr(ecodes, type_name.split('_')[-1]) + ecode_dict = getattr(ecodes, type_name.split("_")[-1]) resolved = resolve_ecodes(ecode_dict, codes, unknown) yield (type_name, etype), resolved -def resolve_ecodes(ecode_dict, ecode_list, unknown='?'): - ''' +def resolve_ecodes(ecode_dict, ecode_list, unknown="?"): + """ Resolve event codes and types to their verbose names. Example ------- >>> resolve_ecodes(ecodes.BTN, [272, 273, 274]) [(['BTN_LEFT', 'BTN_MOUSE'], 272), ('BTN_RIGHT', 273), ('BTN_MIDDLE', 274)] - ''' + """ res = [] for ecode in ecode_list: # elements with AbsInfo(), eg { 3 : [(0, AbsInfo(...)), (1, AbsInfo(...))] } @@ -115,7 +112,7 @@ def resolve_ecodes(ecode_dict, ecode_list, unknown='?'): def find_ecodes_by_regex(regex): - ''' + """ Find ecodes matching a regex and return a mapping of event type to event codes. regex can be a pattern string or a compiled regular expression object. @@ -130,7 +127,7 @@ def find_ecodes_by_regex(regex): ('EV_KEY', 1): [('KEY_BREAK', 411)], ('EV_ABS', 3): [('ABS_BRAKE', 10)] } - ''' + """ regex = re.compile(regex) # re.compile is idempotent result = collections.defaultdict(list) @@ -146,4 +143,4 @@ def find_ecodes_by_regex(regex): return dict(result) -__all__ = ('list_devices', 'is_device', 'categorize', 'resolve_ecodes', 'resolve_ecodes_dict', 'find_ecodes_by_regex') +__all__ = ("list_devices", "is_device", "categorize", "resolve_ecodes", "resolve_ecodes_dict", "find_ecodes_by_regex") diff --git a/tests/test_ecodes.py b/tests/test_ecodes.py index b2f10c4..5c3e38d 100644 --- a/tests/test_ecodes.py +++ b/tests/test_ecodes.py @@ -1,13 +1,14 @@ -# encoding: utf-8 - from evdev import ecodes +from evdev import ecodes_runtime + +prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF UI_FF" -prefixes = 'KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF' -def to_tuples(l): +def to_tuples(val): t = lambda x: tuple(x) if isinstance(x, list) else x - return map(t, l) + return map(t, val) + def test_equality(): keys = [] @@ -16,12 +17,25 @@ def test_equality(): assert set(keys) == set(ecodes.ecodes.values()) + def test_access(): - assert ecodes.KEY_A == ecodes.ecodes['KEY_A'] == ecodes.KEY_A - assert ecodes.KEY[ecodes.ecodes['KEY_A']] == 'KEY_A' - assert ecodes.REL[0] == 'REL_X' + assert ecodes.KEY_A == ecodes.ecodes["KEY_A"] == ecodes.KEY_A + assert ecodes.KEY[ecodes.ecodes["KEY_A"]] == "KEY_A" + assert ecodes.REL[0] == "REL_X" + def test_overlap(): vals_ff = set(to_tuples(ecodes.FF.values())) vals_ff_status = set(to_tuples(ecodes.FF_STATUS.values())) - assert bool(vals_ff & vals_ff_status) == False + assert bool(vals_ff & vals_ff_status) is False + + +def test_generated(): + e_run = vars(ecodes_runtime) + e_gen = vars(ecodes) + + def keys(v): + res = {k for k in v.keys() if not k.startswith("_") and not k[1].islower()} + return res + + assert keys(e_run) == keys(e_gen) \ No newline at end of file diff --git a/tests/test_events.py b/tests/test_events.py index d0717f2..f0f456c 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -16,6 +16,7 @@ def test_categorize(): e = events.InputEvent(1036996631, 984417, ecodes.EV_MSC, 0, 0) assert e == util.categorize(e) + def test_keyevent(): e = events.InputEvent(1036996631, 984417, ecodes.EV_KEY, ecodes.KEY_A, 2) k = events.KeyEvent(e) @@ -23,5 +24,4 @@ def test_keyevent(): assert k.keystate == events.KeyEvent.key_hold assert k.event == e assert k.scancode == ecodes.KEY_A - assert k.keycode == 'KEY_A' # :todo: - + assert k.keycode == "KEY_A" # :todo: diff --git a/tests/test_uinput.py b/tests/test_uinput.py index 21d7b4e..666361f 100644 --- a/tests/test_uinput.py +++ b/tests/test_uinput.py @@ -1,71 +1,80 @@ # encoding: utf-8 - +import os +import stat from select import select -from pytest import raises, fixture +from unittest.mock import patch -from evdev import uinput, ecodes, events, device, util +import pytest +from pytest import raises, fixture +from evdev import uinput, ecodes, device, UInputError -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- uinput_options = { - 'name' : 'test-py-evdev-uinput', - 'bustype' : ecodes.BUS_USB, - 'vendor' : 0x1100, - 'product' : 0x2200, - 'version' : 0x3300, + "name": "test-py-evdev-uinput", + "bustype": ecodes.BUS_USB, + "vendor": 0x1100, + "product": 0x2200, + "version": 0x3300, } + @fixture def c(): return uinput_options.copy() + def device_exists(bustype, vendor, product, version): - match = 'I: Bus=%04hx Vendor=%04hx Product=%04hx Version=%04hx' + match = "I: Bus=%04hx Vendor=%04hx Product=%04hx Version=%04hx" match = match % (bustype, vendor, product, version) - for line in open('/proc/bus/input/devices'): + for line in open("/proc/bus/input/devices"): if line.strip() == match: return True return False -#----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- def test_open(c): ui = uinput.UInput(**c) - args = (c['bustype'], c['vendor'], c['product'], c['version']) + args = (c["bustype"], c["vendor"], c["product"], c["version"]) assert device_exists(*args) ui.close() assert not device_exists(*args) + def test_open_context(c): - args = (c['bustype'], c['vendor'], c['product'], c['version']) + args = (c["bustype"], c["vendor"], c["product"], c["version"]) with uinput.UInput(**c): assert device_exists(*args) assert not device_exists(*args) + def test_maxnamelen(c): with raises(uinput.UInputError): - c['name'] = 'a' * 150 + c["name"] = "a" * 150 uinput.UInput(**c) + def test_enable_events(c): e = ecodes - c['events'] = {e.EV_KEY : [e.KEY_A, e.KEY_B, e.KEY_C]} + c["events"] = {e.EV_KEY: [e.KEY_A, e.KEY_B, e.KEY_C]} with uinput.UInput(**c) as ui: cap = ui.capabilities() assert e.EV_KEY in cap - assert sorted(cap[e.EV_KEY]) == sorted(c['events'][e.EV_KEY]) + assert sorted(cap[e.EV_KEY]) == sorted(c["events"][e.EV_KEY]) + def test_abs_values(c): e = ecodes - c['events'] = { + c = { e.EV_KEY: [e.KEY_A, e.KEY_B], - e.EV_ABS: [(e.ABS_X, (0, 255, 0, 0)), - (e.ABS_Y, device.AbsInfo(0, 255, 5, 10, 0, 0))], + e.EV_ABS: [(e.ABS_X, (0, 0, 255, 0, 0)), (e.ABS_Y, device.AbsInfo(0, 0, 255, 5, 10, 0))], } - with uinput.UInput(**c) as ui: + with uinput.UInput(events=c) as ui: c = ui.capabilities() abs = device.AbsInfo(value=0, min=0, max=255, fuzz=0, flat=0, resolution=0) assert c[e.EV_ABS][0] == (0, abs) @@ -75,11 +84,12 @@ def test_abs_values(c): c = ui.capabilities(verbose=True) abs = device.AbsInfo(value=0, min=0, max=255, fuzz=0, flat=0, resolution=0) - assert c[('EV_ABS', 3)][0] == (('ABS_X', 0), abs) + assert c[("EV_ABS", 3)][0] == (("ABS_X", 0), abs) c = ui.capabilities(verbose=False, absinfo=False) assert c[e.EV_ABS] == list((0, 1)) + def test_write(c): with uinput.UInput(**c) as ui: d = ui.device @@ -89,12 +99,12 @@ def test_write(c): r, w, x = select([d], [d], []) if w and not wrote: - ui.write(ecodes.EV_KEY, ecodes.KEY_P, 1) # KEY_P down - ui.write(ecodes.EV_KEY, ecodes.KEY_P, 1) # KEY_P down - ui.write(ecodes.EV_KEY, ecodes.KEY_P, 0) # KEY_P up - ui.write(ecodes.EV_KEY, ecodes.KEY_A, 1) # KEY_A down - ui.write(ecodes.EV_KEY, ecodes.KEY_A, 2) # KEY_A hold - ui.write(ecodes.EV_KEY, ecodes.KEY_A, 0) # KEY_P up + ui.write(ecodes.EV_KEY, ecodes.KEY_P, 1) # KEY_P down + ui.write(ecodes.EV_KEY, ecodes.KEY_P, 1) # KEY_P down + ui.write(ecodes.EV_KEY, ecodes.KEY_P, 0) # KEY_P up + ui.write(ecodes.EV_KEY, ecodes.KEY_A, 1) # KEY_A down + ui.write(ecodes.EV_KEY, ecodes.KEY_A, 2) # KEY_A hold + ui.write(ecodes.EV_KEY, ecodes.KEY_A, 0) # KEY_P up ui.syn() wrote = True @@ -107,3 +117,21 @@ def test_write(c): assert evs[3].code == ecodes.KEY_A and evs[3].value == 2 assert evs[4].code == ecodes.KEY_A and evs[4].value == 0 break + + +@patch.object(stat, 'S_ISCHR', return_value=False) +def test_not_a_character_device(ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) + +@patch.object(stat, 'S_ISCHR', return_value=True) +@patch.object(os, 'stat', side_effect=OSError()) +def test_not_a_character_device_2(stat_mock, ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) + +@patch.object(stat, 'S_ISCHR', return_value=True) +@patch.object(os, 'stat', return_value=[]) +def test_not_a_character_device_3(stat_mock, ischr_mock, c): + with pytest.raises(UInputError, match='not a character device file'): + uinput.UInput(**c) diff --git a/tests/test_util.py b/tests/test_util.py index 11e338b..7112927 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -2,20 +2,20 @@ def test_match_ecodes_a(): - res = util.find_ecodes_by_regex('KEY_ZOOM.*') + res = util.find_ecodes_by_regex("KEY_ZOOM.*") assert res == {1: [372, 418, 419, 420]} assert dict(util.resolve_ecodes_dict(res)) == { - ('EV_KEY', 1): [ - (['KEY_FULL_SCREEN', 'KEY_ZOOM'], 372), - ('KEY_ZOOMIN', 418), - ('KEY_ZOOMOUT', 419), - ('KEY_ZOOMRESET', 420) + ("EV_KEY", 1): [ + (("KEY_FULL_SCREEN", "KEY_ZOOM"), 372), + ("KEY_ZOOMIN", 418), + ("KEY_ZOOMOUT", 419), + ("KEY_ZOOMRESET", 420), ] } - res = util.find_ecodes_by_regex(r'(ABS|KEY)_BR(AKE|EAK)') + res = util.find_ecodes_by_regex(r"(ABS|KEY)_BR(AKE|EAK)") assert res == {1: [411], 3: [10]} assert dict(util.resolve_ecodes_dict(res)) == { - ('EV_KEY', 1): [('KEY_BREAK', 411)], - ('EV_ABS', 3): [('ABS_BRAKE', 10)] + ("EV_KEY", 1): [("KEY_BREAK", 411)], + ("EV_ABS", 3): [("ABS_BRAKE", 10)], }