diff options
author | Arthur Zamarin <arthurzam@gentoo.org> | 2022-10-10 15:42:26 +0300 |
---|---|---|
committer | Arthur Zamarin <arthurzam@gentoo.org> | 2022-10-10 18:55:00 +0300 |
commit | 7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2 (patch) | |
tree | 2a5f080d3eb241fe22d1ceb5266962a84402a80a | |
parent | compression/_bzip2: add tests (diff) | |
download | snakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.tar.gz snakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.tar.bz2 snakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.zip |
compression.__init__: add tests
Signed-off-by: Arthur Zamarin <arthurzam@gentoo.org>
-rw-r--r-- | src/snakeoil/compression/__init__.py | 27 | ||||
-rw-r--r-- | src/snakeoil/compression/_util.py | 24 | ||||
-rw-r--r-- | tests/compression/__init__.py | 11 | ||||
-rw-r--r-- | tests/compression/test_bzip2.py | 10 | ||||
-rw-r--r-- | tests/compression/test_init.py | 88 |
5 files changed, 121 insertions, 39 deletions
diff --git a/src/snakeoil/compression/__init__.py b/src/snakeoil/compression/__init__.py index b250b1af..4b4a437c 100644 --- a/src/snakeoil/compression/__init__.py +++ b/src/snakeoil/compression/__init__.py @@ -1,9 +1,9 @@ import shlex +from functools import cached_property from importlib import import_module -from .. import klass +from .. import process from ..cli.exceptions import UserException -from ..process import CommandNotFound, find_binary from ..process.spawn import spawn_get_output @@ -12,14 +12,10 @@ class _transform_source: def __init__(self, name): self.name = name - @klass.jit_attr + @cached_property def module(self): return import_module(f'snakeoil.compression._{self.name}') - @klass.jit_attr - def parallelizable(self): - return bool(getattr(self.module, 'parallelizable', False)) - def compress_data(self, data, level, parallelize=False): parallelize = parallelize and self.module.parallelizable return self.module.compress_data(data, level, parallelize=parallelize) @@ -81,7 +77,7 @@ class ArComp: def __init_subclass__(cls, **kwargs): """Initialize result subclasses and register archive extensions.""" super().__init_subclass__(**kwargs) - if not all((cls.binary, cls.default_unpack_cmd, cls.exts)): + if not all((cls.binary, cls.default_unpack_cmd, cls.exts)): # pragma: no cover raise ValueError(f'class missing required attrs: {cls!r}') for ext in cls.exts: cls.known_exts[ext] = cls @@ -89,13 +85,13 @@ class ArComp: def __init__(self, path, ext=None): self.path = path - @klass.jit_attr + @cached_property def _unpack_cmd(self): for b in self.binary: try: - binary = find_binary(b) + binary = process.find_binary(b) break - except CommandNotFound: + except process.CommandNotFound: continue else: choices = ', '.join(self.binary) @@ -107,9 +103,6 @@ class ArComp: def unpack(self, dest=None, **kwargs): raise NotImplementedError - def create(self, dest): - raise NotImplementedError - class _Archive: """Generic archive format support.""" @@ -155,16 +148,16 @@ class _Tar(_Archive, ArComp): compress_binary = None default_unpack_cmd = '{binary} xf "{path}"' - @klass.jit_attr + @cached_property def _unpack_cmd(self): cmd = super()._unpack_cmd if self.compress_binary is not None: for b in self.compress_binary: try: - find_binary(b) + process.find_binary(b) cmd += f' --use-compress-program={b}' break - except CommandNotFound: + except process.CommandNotFound: pass else: choices = ', '.join(self.compress_binary) diff --git a/src/snakeoil/compression/_util.py b/src/snakeoil/compression/_util.py index 6a3c7258..e1af5aef 100644 --- a/src/snakeoil/compression/_util.py +++ b/src/snakeoil/compression/_util.py @@ -22,7 +22,7 @@ def _drive_process(args, mode, data): def compress_data(binary, data, compresslevel=9, extra_args=()): - args = [binary, '-%ic' % compresslevel] + args = [binary, f'-{compresslevel}c'] args.extend(extra_args) return _drive_process(args, 'compression', data) @@ -36,9 +36,7 @@ def decompress_data(binary, data, extra_args=()): class _process_handle: def __init__(self, handle, args, is_read=False): - self.mode = 'wb' - if is_read: - self.mode = 'rb' + self.mode = 'rb' if is_read else 'wb' self.args = tuple(args) self.is_read = is_read @@ -55,8 +53,7 @@ class _process_handle: elif not isinstance(handle, int): if not hasattr(handle, 'fileno'): raise TypeError( - "handle %r isn't a string, integer, and lacks a fileno " - "method" % (handle,)) + f"handle {handle!r} isn't a string, integer, and lacks a fileno method") handle = handle.fileno() try: @@ -108,8 +105,8 @@ class _process_handle: if fwd_seek < 0: if self._allow_reopen is None: raise TypeError( - "instance %s can't do negative seeks: asked for %i, " - "was at %i" % (self, position, self.position)) + f"instance {self} can't do negative seeks: " + f"asked for {position}, was at {self.position}") self._terminate() self._open_handle(self._allow_reopen) return self.seek(position) @@ -142,16 +139,17 @@ class _process_handle: def _terminate(self): try: self._process.terminate() - except EnvironmentError as e: + except EnvironmentError as exc: # allow no such process only. - if e.errno != errno.ESRCH: + if exc.errno != errno.ESRCH: raise def close(self): + if not hasattr(self, '_process'): + return if self._process.returncode is not None: if self._process.returncode != 0: - raise Exception("%s invocation had non zero exit: %i" % - (self.args, self._process.returncode)) + raise Exception(f"{self.args} invocation had non zero exit: {self._process.returncode}") return self.handle.close() @@ -165,7 +163,7 @@ class _process_handle: def compress_handle(binary_path, handle, compresslevel=9, extra_args=()): - args = [binary_path, '-%ic' % compresslevel] + args = [binary_path, f'-{compresslevel}c'] args.extend(extra_args) return _process_handle(handle, args, False) diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py index e69de29b..178d4b64 100644 --- a/tests/compression/__init__.py +++ b/tests/compression/__init__.py @@ -0,0 +1,11 @@ +from unittest.mock import patch + +from snakeoil.process import CommandNotFound, find_binary + +def hide_binary(*binaries: str): + def mock_find_binary(name): + if name in binaries: + raise CommandNotFound(name) + return find_binary(name) + + return patch('snakeoil.process.find_binary', side_effect=mock_find_binary) diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py index f8f1d1fb..d11d61fa 100644 --- a/tests/compression/test_bzip2.py +++ b/tests/compression/test_bzip2.py @@ -1,21 +1,13 @@ import importlib from bz2 import decompress from pathlib import Path -from unittest import mock import pytest from snakeoil.compression import _bzip2 from snakeoil.process import CommandNotFound, find_binary from snakeoil.test import hide_imports - -def hide_binary(*binaries: str): - def mock_find_binary(name): - if name in binaries: - raise CommandNotFound(name) - return find_binary(name) - - return mock.patch('snakeoil.process.find_binary', side_effect=mock_find_binary) +from . import hide_binary def test_no_native(): diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py new file mode 100644 index 00000000..0726571b --- /dev/null +++ b/tests/compression/test_init.py @@ -0,0 +1,88 @@ +import shutil +import subprocess +import sys + +import pytest +from snakeoil.compression import ArComp, ArCompError, _TarBZ2 +from snakeoil.contexts import chdir + +from . import hide_binary + + +@pytest.mark.skipif(sys.platform == "darwin", reason="darwin fails with bzip2") +class TestArComp: + + @pytest.fixture(scope='class') + def tar_file(self, tmp_path_factory): + data = tmp_path_factory.mktemp("data") + (data / 'file1').write_text('Hello world') + (data / 'file2').write_text('Larry the Cow') + path = data / 'test 1.tar' + subprocess.run(['tar', 'cf', str(path), 'file1', 'file2'], cwd=data, check=True) + (data / 'file1').unlink() + (data / 'file2').unlink() + return str(path) + + @pytest.fixture(scope='class') + def tar_bz2_file(self, tar_file): + subprocess.run(['bzip2', '-z', '-k', tar_file], check=True) + return tar_file + ".bz2" + + @pytest.fixture(scope='class') + def tbz2_file(self, tar_bz2_file): + new_path = tar_bz2_file.replace('.tar.bz2', '.tbz2') + shutil.copyfile(tar_bz2_file, new_path) + return new_path + + @pytest.fixture(scope='class') + def lzma_file(self, tmp_path_factory): + data = (tmp_path_factory.mktemp("data") / 'test 2.lzma') + with data.open('wb') as f: + subprocess.run(['lzma'], check=True, input=b'Hello world', stdout=f) + return str(data) + + def test_unknown_extenstion(self, tmp_path): + file = tmp_path / 'test.file' + with pytest.raises(ArCompError, match='unknown compression file extension'): + ArComp(file, ext='.foo') + + def test_missing_tar(self, tmp_path, tar_file): + with hide_binary('tar'), chdir(tmp_path): + with pytest.raises(ArCompError, match='required binary not found'): + ArComp(tar_file, ext='.tar').unpack(dest=tmp_path) + + def test_tar(self, tmp_path, tar_file): + with chdir(tmp_path): + ArComp(tar_file, ext='.tar').unpack(dest=tmp_path) + assert (tmp_path / 'file1').read_text() == 'Hello world' + assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + + def test_tar_bz2(self, tmp_path, tar_bz2_file): + with chdir(tmp_path): + ArComp(tar_bz2_file, ext='.tar.bz2').unpack(dest=tmp_path) + assert (tmp_path / 'file1').read_text() == 'Hello world' + assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + + def test_tbz2(self, tmp_path, tbz2_file): + with chdir(tmp_path): + ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) + assert (tmp_path / 'file1').read_text() == 'Hello world' + assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + + def test_fallback_tbz2(self, tmp_path, tbz2_file): + with hide_binary(*_TarBZ2.compress_binary[:-1]): + with chdir(tmp_path): + ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) + assert (tmp_path / 'file1').read_text() == 'Hello world' + assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + + def test_no_fallback_tbz2(self, tmp_path, tbz2_file): + with hide_binary(*_TarBZ2.compress_binary), chdir(tmp_path): + with pytest.raises(ArCompError, match='no compression binary'): + ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) + + def test_lzma(self, tmp_path, lzma_file): + dest = tmp_path / 'file' + with chdir(tmp_path): + ArComp(lzma_file, ext='.lzma').unpack(dest=dest) + assert (dest).read_bytes() == b'Hello world' |