| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 | """Utilities related archives."""import loggingimport osimport shutilimport statimport tarfileimport zipfilefrom typing import Iterable, List, Optionalfrom zipfile import ZipInfofrom pip._internal.exceptions import InstallationErrorfrom pip._internal.utils.filetypes import (    BZ2_EXTENSIONS,    TAR_EXTENSIONS,    XZ_EXTENSIONS,    ZIP_EXTENSIONS,)from pip._internal.utils.misc import ensure_dirlogger = logging.getLogger(__name__)SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONStry:    import bz2  # noqa    SUPPORTED_EXTENSIONS += BZ2_EXTENSIONSexcept ImportError:    logger.debug("bz2 module is not available")try:    # Only for Python 3.3+    import lzma  # noqa    SUPPORTED_EXTENSIONS += XZ_EXTENSIONSexcept ImportError:    logger.debug("lzma module is not available")def current_umask() -> int:    """Get the current umask which involves having to set it temporarily."""    mask = os.umask(0)    os.umask(mask)    return maskdef split_leading_dir(path: str) -> List[str]:    path = path.lstrip("/").lstrip("\\")    if "/" in path and (        ("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path    ):        return path.split("/", 1)    elif "\\" in path:        return path.split("\\", 1)    else:        return [path, ""]def has_leading_dir(paths: Iterable[str]) -> bool:    """Returns true if all the paths have the same leading path name    (i.e., everything is in one subdirectory in an archive)"""    common_prefix = None    for path in paths:        prefix, rest = split_leading_dir(path)        if not prefix:            return False        elif common_prefix is None:            common_prefix = prefix        elif prefix != common_prefix:            return False    return Truedef is_within_directory(directory: str, target: str) -> bool:    """    Return true if the absolute path of target is within the directory    """    abs_directory = os.path.abspath(directory)    abs_target = os.path.abspath(target)    prefix = os.path.commonprefix([abs_directory, abs_target])    return prefix == abs_directorydef set_extracted_file_to_default_mode_plus_executable(path: str) -> None:    """    Make file present at path have execute for user/group/world    (chmod +x) is no-op on windows per python docs    """    os.chmod(path, (0o777 & ~current_umask() | 0o111))def zip_item_is_executable(info: ZipInfo) -> bool:    mode = info.external_attr >> 16    # if mode and regular file and any execute permissions for    # user/group/world?    return bool(mode and stat.S_ISREG(mode) and mode & 0o111)def unzip_file(filename: str, location: str, flatten: bool = True) -> None:    """    Unzip the file (with path `filename`) to the destination `location`.  All    files are written based on system defaults and umask (i.e. permissions are    not preserved), except that regular file members with any execute    permissions (user, group, or world) have "chmod +x" applied after being    written. Note that for windows, any execute changes using os.chmod are    no-ops per the python docs.    """    ensure_dir(location)    zipfp = open(filename, "rb")    try:        zip = zipfile.ZipFile(zipfp, allowZip64=True)        leading = has_leading_dir(zip.namelist()) and flatten        for info in zip.infolist():            name = info.filename            fn = name            if leading:                fn = split_leading_dir(name)[1]            fn = os.path.join(location, fn)            dir = os.path.dirname(fn)            if not is_within_directory(location, fn):                message = (                    "The zip file ({}) has a file ({}) trying to install "                    "outside target directory ({})"                )                raise InstallationError(message.format(filename, fn, location))            if fn.endswith("/") or fn.endswith("\\"):                # A directory                ensure_dir(fn)            else:                ensure_dir(dir)                # Don't use read() to avoid allocating an arbitrarily large                # chunk of memory for the file's content                fp = zip.open(name)                try:                    with open(fn, "wb") as destfp:                        shutil.copyfileobj(fp, destfp)                finally:                    fp.close()                    if zip_item_is_executable(info):                        set_extracted_file_to_default_mode_plus_executable(fn)    finally:        zipfp.close()def untar_file(filename: str, location: str) -> None:    """    Untar the file (with path `filename`) to the destination `location`.    All files are written based on system defaults and umask (i.e. permissions    are not preserved), except that regular file members with any execute    permissions (user, group, or world) have "chmod +x" applied after being    written.  Note that for windows, any execute changes using os.chmod are    no-ops per the python docs.    """    ensure_dir(location)    if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"):        mode = "r:gz"    elif filename.lower().endswith(BZ2_EXTENSIONS):        mode = "r:bz2"    elif filename.lower().endswith(XZ_EXTENSIONS):        mode = "r:xz"    elif filename.lower().endswith(".tar"):        mode = "r"    else:        logger.warning(            "Cannot determine compression type for file %s",            filename,        )        mode = "r:*"    tar = tarfile.open(filename, mode, encoding="utf-8")    try:        leading = has_leading_dir([member.name for member in tar.getmembers()])        for member in tar.getmembers():            fn = member.name            if leading:                fn = split_leading_dir(fn)[1]            path = os.path.join(location, fn)            if not is_within_directory(location, path):                message = (                    "The tar file ({}) has a file ({}) trying to install "                    "outside target directory ({})"                )                raise InstallationError(message.format(filename, path, location))            if member.isdir():                ensure_dir(path)            elif member.issym():                try:                    tar._extract_member(member, path)                except Exception as exc:                    # Some corrupt tar files seem to produce this                    # (specifically bad symlinks)                    logger.warning(                        "In the tar file %s the member %s is invalid: %s",                        filename,                        member.name,                        exc,                    )                    continue            else:                try:                    fp = tar.extractfile(member)                except (KeyError, AttributeError) as exc:                    # Some corrupt tar files seem to produce this                    # (specifically bad symlinks)                    logger.warning(                        "In the tar file %s the member %s is invalid: %s",                        filename,                        member.name,                        exc,                    )                    continue                ensure_dir(os.path.dirname(path))                assert fp is not None                with open(path, "wb") as destfp:                    shutil.copyfileobj(fp, destfp)                fp.close()                # Update the timestamp (useful for cython compiled files)                tar.utime(member, path)                # member have any execute permissions for user/group/world?                if member.mode & 0o111:                    set_extracted_file_to_default_mode_plus_executable(path)    finally:        tar.close()def unpack_file(    filename: str,    location: str,    content_type: Optional[str] = None,) -> None:    filename = os.path.realpath(filename)    if (        content_type == "application/zip"        or filename.lower().endswith(ZIP_EXTENSIONS)        or zipfile.is_zipfile(filename)    ):        unzip_file(filename, location, flatten=not filename.endswith(".whl"))    elif (        content_type == "application/x-gzip"        or tarfile.is_tarfile(filename)        or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS)    ):        untar_file(filename, location)    else:        # FIXME: handle?        # FIXME: magic signatures?        logger.critical(            "Cannot unpack file %s (downloaded from %s, content-type: %s); "            "cannot detect archive format",            filename,            location,            content_type,        )        raise InstallationError(f"Cannot determine archive format of {location}")
 |