import functools import sys from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple from pip._vendor.rich.progress import ( BarColumn, DownloadColumn, FileSizeColumn, Progress, ProgressColumn, SpinnerColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, TransferSpeedColumn, ) from pip._internal.cli.spinners import RateLimiter from pip._internal.utils.logging import get_indentation DownloadProgressRenderer = Callable[[Iterable[bytes]], Iterator[bytes]] def _rich_progress_bar( iterable: Iterable[bytes], *, bar_type: str, size: Optional[int], ) -> Generator[bytes, None, None]: assert bar_type == "on", "This should only be used in the default mode." if not size: total = float("inf") columns: Tuple[ProgressColumn, ...] = ( TextColumn("[progress.description]{task.description}"), SpinnerColumn("line", speed=1.5), FileSizeColumn(), TransferSpeedColumn(), TimeElapsedColumn(), ) else: total = size columns = ( TextColumn("[progress.description]{task.description}"), BarColumn(), DownloadColumn(), TransferSpeedColumn(), TextColumn("eta"), TimeRemainingColumn(), ) progress = Progress(*columns, refresh_per_second=5) task_id = progress.add_task(" " * (get_indentation() + 2), total=total) with progress: for chunk in iterable: yield chunk progress.update(task_id, advance=len(chunk)) def _raw_progress_bar( iterable: Iterable[bytes], *, size: Optional[int], ) -> Generator[bytes, None, None]: def write_progress(current: int, total: int) -> None: sys.stdout.write("Progress %d of %d\n" % (current, total)) sys.stdout.flush() current = 0 total = size or 0 rate_limiter = RateLimiter(0.25) write_progress(current, total) for chunk in iterable: current += len(chunk) if rate_limiter.ready() or current == total: write_progress(current, total) rate_limiter.reset() yield chunk def get_download_progress_renderer( *, bar_type: str, size: Optional[int] = None ) -> DownloadProgressRenderer: """Get an object that can be used to render the download progress. Returns a callable, that takes an iterable to "wrap". """ if bar_type == "on": return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size) elif bar_type == "raw": return functools.partial(_raw_progress_bar, size=size) else: return iter # no-op, when passed an iterator