filter_file uses "surrogateescape" error handling (#12765)

From Python docs:
--
'surrogateescape' will represent any incorrect bytes as code points in
the Unicode Private Use Area ranging from U+DC80 to U+DCFF. These
private code points will then be turned back into the same bytes when
the surrogateescape error handler is used when writing data. This is
useful for processing files in an unknown encoding.
--

This will allow us to process files with unknown encodings.

To accommodate the case of self-extracting bash scripts, filter_file
can now stop filtering text input if a certain marker is found. The
marker must be passed at call time via the "stop_at" function argument.
At that point the file will be reopened in binary mode and copied
verbatim.

* use "surrogateescape" error handling to ignore unknown chars
* permit to stop filtering if a marker is found
* add unit tests for non-ASCII and mixed text/binary files
This commit is contained in:
Massimiliano Culpo 2019-10-15 05:35:14 +02:00 committed by Todd Gamblin
parent 3f46f03c83
commit 5cd28847e8
5 changed files with 1603 additions and 7 deletions

View file

@ -6,7 +6,6 @@
import collections
import errno
import hashlib
import fileinput
import glob
import grp
import itertools
@ -123,10 +122,15 @@ def filter_file(regex, repl, *filenames, **kwargs):
backup (bool): Make backup file(s) suffixed with ``~``. Default is True
ignore_absent (bool): Ignore any files that don't exist.
Default is False
stop_at (str): Marker used to stop scanning the file further. If a text
line matches this marker filtering is stopped and the rest of the
file is copied verbatim. Default is to filter until the end of the
file.
"""
string = kwargs.get('string', False)
backup = kwargs.get('backup', True)
ignore_absent = kwargs.get('ignore_absent', False)
stop_at = kwargs.get('stop_at', None)
# Allow strings to use \1, \2, etc. for replacement, like sed
if not callable(repl):
@ -159,8 +163,36 @@ def groupid_to_group(x):
shutil.copy(filename, backup_filename)
try:
for line in fileinput.input(filename, inplace=True):
print(re.sub(regex, repl, line.rstrip('\n')))
extra_kwargs = {}
if sys.version_info > (3, 0):
extra_kwargs = {'errors': 'surrogateescape'}
# Open as a text file and filter until the end of the file is
# reached or we found a marker in the line if it was specified
with open(backup_filename, mode='r', **extra_kwargs) as input_file:
with open(filename, mode='w', **extra_kwargs) as output_file:
# Using iter and readline is a workaround needed not to
# disable input_file.tell(), which will happen if we call
# input_file.next() implicitly via the for loop
for line in iter(input_file.readline, ''):
if stop_at is not None:
current_position = input_file.tell()
if stop_at == line.strip():
output_file.write(line)
break
filtered_line = re.sub(regex, repl, line)
output_file.write(filtered_line)
else:
current_position = None
# If we stopped filtering at some point, reopen the file in
# binary mode and copy verbatim the remaining part
if current_position and stop_at:
with open(backup_filename, mode='rb') as input_file:
input_file.seek(current_position)
with open(filename, mode='ab') as output_file:
output_file.writelines(input_file.readlines())
except BaseException:
# clean up the original file on failure.
shutil.move(backup_filename, filename)

Binary file not shown.

File diff suppressed because it is too large Load diff

View file

@ -5,10 +5,14 @@
"""Tests for ``llnl/util/filesystem.py``"""
import llnl.util.filesystem as fs
import os
import stat
import pytest
import os
import shutil
import stat
import sys
import llnl.util.filesystem as fs
import spack.paths
@pytest.fixture()
@ -306,3 +310,31 @@ def test_headers_directory_setter():
# Setting directories to None also returns an empty list
hl.directories = None
assert hl.directories == []
@pytest.mark.regression('7358')
@pytest.mark.parametrize('regex,replacement,filename,keyword_args', [
(r"\<malloc\.h\>", "<stdlib.h>", 'x86_cpuid_info.c', {}),
(r"CDIR", "CURRENT_DIRECTORY", 'selfextract.bsx',
{'stop_at': '__ARCHIVE_BELOW__'})
])
def test_filter_files_with_different_encodings(
regex, replacement, filename, tmpdir, keyword_args
):
# All files given as input to this test must satisfy the pre-requisite
# that the 'replacement' string is not present in the file initially and
# that there's at least one match for the regex
original_file = os.path.join(
spack.paths.test_path, 'data', 'filter_file', filename
)
target_file = os.path.join(str(tmpdir), filename)
shutil.copy(original_file, target_file)
# This should not raise exceptions
fs.filter_file(regex, replacement, target_file, **keyword_args)
# Check the strings have been replaced
extra_kwargs = {}
if sys.version_info > (3, 0):
extra_kwargs = {'errors': 'surrogateescape'}
with open(target_file, mode='r', **extra_kwargs) as f:
assert replacement in f.read()

View file

@ -26,7 +26,8 @@ def install(self, spec, prefix):
filter_file('INSTALL_DIR=~/.aspera',
'INSTALL_DIR=%s' % prefix,
runfile,
string=True)
string=True,
stop_at='__ARCHIVE_FOLLOWS__')
# Install
chmod = which('chmod')
chmod('+x', runfile)