File: android-deltas-scripts/common.py

"""
=======================================================================
Code used by scripts run on both PC and phone.

See _README.html for license, attribution, version, and docs.

Dev notes:

1) This file is loaded by import, and expects callers to immediately
run its startup(), passing in a config_{pc, phone} module which has 
global settings copied and used here.  This is due to its Bash heritage;
Py scripts here are more-or-less direct translations for portability.

2) In config_*.py flies, set PauseSteps=n to avoid having to press Enter 
here, and set CheckFilenames=n to avoid name-fixer interactions here.

3) You can also change ZIP here to point to a newer installed ziptools,
but don't generally need to; the version in Mergeall-source/ suffices.
=======================================================================
"""

import os, time, sys, shutil, subprocess
from os.path import join

VERSION = 1.1

# Linux also means WSL on Windows, and Android;
# Cygwin is Windows, but using its own Python

RunningOnWindows = sys.platform.startswith('win')       # may be run by Cygwin
RunningOnMacOS   = sys.platform.startswith('darwin')
RunningOnLinux   = sys.platform.startswith('linux')     # includes WSL, Android
RunningOnCygwin  = sys.platform.startswith('cygwin')    # Cygwin's own python
RunningOnAndroid = any(key for key in os.environ if key.startswith('ANDROID_'))

# common indent width
indent = ' ' * 4

# catch control-c in input() and exit nicely
builtin_input = input
def input(prompt=''):
    try:
        return builtin_input(prompt + ' ')
    except KeyboardInterrupt:
        print('\nRun aborted by control-c at prompt.')
        shutdown(bad=True)

import builtins
builtins.input = input    # default to version here everywhere



#----------------------------------------------------------------------
# Setup: globals, logs dir, etc.
#----------------------------------------------------------------------


def startup(config, self):
    """
    -------------------------------------------------------------------
    Common startup code, called with either the PC or phone config 
    module, and the module containing this function.

    This copies the config module's names into this module's namespace,
    and adds two more names here--and all of these names become globals 
    both here and in the client scripts.  Hackish, maybe, but globals 
    are denoted by all uppercase (mostly), and this is better than the 
    original Bash translation, which ran this file's all-top-level code 
    in-line in clients' namespaces to emulate a Bash 'source <file>':
    
        from config_pc import *
        mydir = os.path.dirname(__file__)
        exec(open(join(mydir, 'common.py')).read())

    This worked until the post hook script did the same thing and broke
    outputs.  Some globals may be avoided by passing args to utilities,
    but this module's namespace avoids extra args (~6 in some cases).
    -------------------------------------------------------------------
    """
    global ZIP, DATE    # set here!
    print('Android Deltas Scripts %.1f' % VERSION)

    # copy pc-or-phone config settings to my namespace
    for name in dir(config):
        if not name.startswith('__'):
            setattr(self, name, getattr(config, name))

    # check that main common vars are set before tweaking
    for config in ('LOGS', 'MALL', 'FROM', 'TO', 'STUFF'):
        try:
            getattr(self, config)
        except:
            print('Config "%s" is not set' % config)
            print('Exiting; please check the config file and rerun.')
            shutdown(bad=True)

    # expand user home (~, ~user) and env vars ($var, ${var}) in paths
    expander = lambda path: os.path.expandvars(os.path.expanduser(path))
    for config in ('LOGS', 'MALL', 'FROM', 'TO'):
        setattr(self, config, expander(getattr(self, config)))

    # path checks in all scripts; STUFF is also checked in some scripts
    for config in ('MALL', 'FROM', 'TO'):
        setting = getattr(self, config)
        if not os.path.isdir(setting):
            print('The %s path does not exist: "%s"' % (config, setting))
            print('Exiting; please check the config file and rerun.')
            shutdown(bad=True)

    # ensure logfiles folder in all scripts
    if not os.path.isdir(LOGS):
        if os.path.exists(LOGS):
            os.remove(LOGS)
        os.mkdir(LOGS)

    # fix Windows non-ASCII prints when 'script > file' (cygwin too?)
    if RunningOnWindows:
        os.environ['PYTHONIOENCODING'] = 'utf8'    # subprocs inherit

    # ziptools source-code folder (included in Mergeall)
    ZIP = join(MALL, 'test', 'ziptools')

    # logfile date stamp
    now = time.localtime()
    DATE = time.strftime('%y-%m-%d', now)   # e.g., 21-09-30 for Sep 30, 2021 (sortable)



#----------------------------------------------------------------------
# Utilities 
#----------------------------------------------------------------------



def announce(steplabel): 
    """
    -------------------------------------------------------------------
    A conditional start-of-step display: pause for enter if enabled.
    -------------------------------------------------------------------
    """
    print('\nSTART:', steplabel)
    if PauseSteps:
        try:
            builtin_input('Press enter/return to continue') 
        except KeyboardInterrupt:
            print('\nRun aborted by control-c before step.')
            shutdown()



def opener(message):
    """
    -------------------------------------------------------------------
    Display opening message, save run start time for use in closer().
    Package name+version are shown in startup(), before error tests.
    -------------------------------------------------------------------
    """
    global RunStartTime
    RunStartTime = time.perf_counter()    # relative seconds
    print('\n' + message)



def closer(message): 
    """
    -------------------------------------------------------------------
    Normal exit: show total elapsed time (all steps + user time), and
    pause for an Enter press to allow output to be read if a PC-side 
    script was run on Windows by an icon click (see shutdown() docs).
    -------------------------------------------------------------------
    """
    global RunStartTime
    elapsed = time.perf_counter() - RunStartTime    # float seconds

    h, m, s = (elapsed // 3600), ((elapsed % 3600) // 60), (elapsed % 60)
    print('\nFinished - total elapsed time (h:m:s) = %d:%d:%.0f' % (h, m, s))

    print(message)
    shutdown()



def shutdown(bad=False):
    """
    -------------------------------------------------------------------
    All exits: pause for an Enter press iff a PC-side script was run 
    on Windows by an icon click, else the console window is closed 
    before its output can be read.  Then, end the script run now with
    shell status 1 if bad (errors), or 0 if not bad (good/normal).  

    Caveat: as coded, this may require an Enter in some other Windows 
    contexts besides icon clicks, unless streams are redirected.  Its 
    PROMPT test suffices to keep click windows up, and does not require
    an Enter in Command Prompt, Cygwin, or some IDEs (e.g., PyEdit).
    But it does require an Enter in both PowerShell and the IDLE IDE.  
 
    The only way to do better seems sniffing parent processes, and 
    this is too complex and brittle.  There really should be a better
    way to detect this very common Windows use case by now...

    Update: Termux:Widget launches similarly erase the console window
    on script exit (and open a different Termux session).  To support
    this, test a ForceKeyToClose config-file setting on phone.  This is 
    preset to False; Termux:Widget is nice, but likely rare in the wild.
    This config also works on PCs, but is not needed for Windows clicks.
    See also _etc/termux-widget-shims/ for wrappers that make this moot.
    -------------------------------------------------------------------
    """
    if (ForceKeyToClose or                     # Termux:Widget?

       (RunningOnWindows    and                # Windows +

        sys.stdout.isatty() and                # console output +
        sys.stdin.isatty()  and                # console input +

        'PROMPT' not in os.environ)):          # clicked only? (maybe)

        builtin_input('Press enter/return to exit')

    sys.exit(1 if bad else 0)



def offerFixnamesRun(loglabel):
    """
    -------------------------------------------------------------------
    On PC, ask to run the nonportable-filenames fixer, in both initial
    copies and syncs.  This is nearly required on Linux and macOS for 
    interoperability with Windows, and FAT32, exFAT, and BDR drives.

    But don't ask on Windows: Windows disallows nonportables globally,
    and Cygwin's munged nonportables register as the munges instead of
    the nonportables if it's using Windows Python.  Cygwin's own Python 
    does translates munges to nonportables, but also sets sys.platform
    to 'cygwin' so it's not RunningOnWindows.  WSL munges too, but its
    sys.platform is 'linux'.  See _etc/examples/windows*/x* for demos.

    For convenience, this scrapes the fixer's last two report lines as
    a summary, and pulls out the number-found at the end of the last.
    This is both brittle and error prone, and requires forging user 
    input for the script's verify, but reduces output and interaction.
    -------------------------------------------------------------------
    """
    if CheckFilenames and not RunningOnWindows:
        print('\n'
	'It is recommended to run fix-nonportable-filenames.py before\n' 
	'propagating content from Unix to Windows, some Android\'s shared\n' 
	'storage, and proxy drives using the FAT32 or exFAT filesystems.')

        fixer = join(MALL, 'fix-nonportable-filenames.py')
        logto = join(LOGS, '%s--%s-fixnames-log.txt' % (DATE, loglabel))

        userreply = input('Run the name-fixer script in report-only mode (y or n)?')
        if userreply == 'y':
            print('Running name fixer')
            lasttwo = runpy(fixer, join(FROM, STUFF), '-', 
                            input=b'y\n',
                            logpath=logto, 
                            tailing=(2, 'Name-fixer summary'))

            # end interaction now if no lines to change
            numfound = int(lasttwo[-1].split(':')[1].strip())
            if numfound == 0:
                print('No names to change')
                return

            userreply = input('Display name-fixer\'s full output (y or n)?')
            if userreply == 'y':
                print(open(logto, 'r', encoding='utf8').read())   

        userreply = input('Run the name-fixer script to fix filenames (y or n)?')
        if userreply != 'y':
            print('Name-fixer script not run')
        else:
            print('Running name fixer')
            runpy(fixer, join(FROM, STUFF), 
                  input=b'y\n',
                  logpath=logto,
                  tailing=(2, 'Name-fixer summary'))



def verifyPCtoProxy(loglabelM, loglabelD):
    """
    -------------------------------------------------------------------
    On PC, verify that PC and proxy-drive content is the same.  
    This is called for both initial copies and content syncs.
    It asks to run mergeall and diffall separately: the former is
    fast, but the latter is slow for larger content collections.
    Similar code in verify*part2 couldn't be factored into this.
    -------------------------------------------------------------------
    """
    if VerifyProxy:
        userreply = input('\nVerify PC copy to proxy copy with mergeall (y or n)?')
        if userreply == 'y':
            print('Running mergeall')
            logto = join(LOGS, '%s--%s-verify-mergeall-log.txt' % (DATE, loglabelM))
            runpy(join(MALL, 'mergeall.py'), 
                  join(FROM, STUFF), join(TO, STUFF), '-report', '-skipcruft', '-quiet',
                  logpath=logto,
                  tailing=(9, 'Mergeall summary'))

        userreply = input('\nVerify PC copy to proxy copy with diffall (y or n)?')
        if userreply == 'y':
            print('Running diffall')
            logto = join(LOGS, '%s--%s-verify-diffall-log.txt' % (DATE, loglabelD))
            runpy(join(MALL, 'diffall.py'),
                  join(FROM, STUFF), join(TO, STUFF), '-skipcruft', '-quiet',
                  logpath=logto,
                  tailing=(7, 'Diffall summary'))    # 6=none, 7=1 unique (__bkp__)



def previewChanges(loglabel):
    """
    -------------------------------------------------------------------
    On PC, show PC~proxy deltas before saving or propagating, for sync
    runs only.  Like offerFixnamesRun(), this routes output to a file
    and scrapes its final report, to reduce output in the console.
    -------------------------------------------------------------------
    """
    if PreviewSyncs:
        userreply = input('\nPreview changes in the source tree (y or n)?')
        if userreply == 'y':
            print('Running preview of changes to be synced')
            logto = join(LOGS, '%s--%s-preview-log.txt' % (DATE, loglabel))

            runpy(join(MALL, 'mergeall.py'), 
                  join(FROM, STUFF), join(TO, STUFF), '-report', '-skipcruft', '-quiet',
                  logpath=logto,
                  tailing=(9, 'Preview summary'))

            userreply = input('Display preview\'s full output (y or n)?')
            if userreply == 'y':
                print(open(logto, 'r', encoding='utf8').read())
            
            userreply = input('Continue with sync (y or n)?')
            if userreply != 'y':
                print('Sync aborted.')
                shutdown()



def removeExistingOrStop(thedst, where):
    """
    -------------------------------------------------------------------
    In both initial-copy scripts, get the user's okay and then delete
    existing content trees on proxy or phone; shutdown if not approved.
    Also used in the phone-sync script before unzipping, in case a 
    DELTAS folder is present from a prior-sync abort or other source.
    deltas.py always removes an existing DELTAS folder before starting.
    -------------------------------------------------------------------
    """
    if os.path.exists(thedst):
        print('Removing prior content on', where)
        userreply = input('Proceed with removal (y or n)?')
        if userreply != 'y':
            print('Run stopped for existing content.')
            shutdown()
        else:
            timefunc(lambda: shutil.rmtree(thedst, onerror=rmtreeworkaround))
            print('Starting unzip')



def moveZipToProxy(thezip, prxzip):
    """
    -------------------------------------------------------------------
    Run a copy+delete across filesystems.  This is used by PC scripts
    for both the initial copy's full zip, and the sync's deltas zip.
    prxzip is already a file in driveRootPath(); run early for messages.
    -------------------------------------------------------------------
    """
    try:
        if os.path.exists(prxzip):
            os.remove(prxzip)
        shutil.copy2(thezip, prxzip)
        os.remove(thezip)

    except Exception as E:
        print('Zip move failed - Python exception:', E)
        print('Shutting down; check permissions and space and try again.')
        shutdown(bad=True)



def tail(filename, lines, message): 
    """
    -------------------------------------------------------------------
    A portable 'tail -n lines filename': show the last 'lines' lines
    in file 'filename'.  This is inefficient for large files as coded, 
    but seek-and-scan is complex, and memory use is not a concern here.
    -------------------------------------------------------------------
    """
    print(message + ':')
    lastfew = open(filename, 'r', encoding='utf8').readlines()[-lines:]
    for line in lastfew: 
        print(indent + line.rstrip())
    return lastfew   # tail lines for scraping



def runpy(*cmdargs, logpath=None, input=None, showcmd=False, tailing=None):
    """
    -------------------------------------------------------------------
    Run a Python command line, optionally sending stdout to a file
    named in logpath, and providing stdin text from bytes input.
    *cmdargs is individual words in the command line to run; the
    first is the pathname of the Python script to run.  This adds 
    the host Python's path to the front of *cmdargs automatically.

    tailing can be used iff logpath is a filename.  It is a tuple 
    (lines, message) whose items are passed to the tail() function
    along with the logpath name.  When used, the return value is 
    the result of tail(); else, the return value is elapsed time.
 
    Used on both PC and phone, portable to Win, Mac, Linux, Android.
    Caveat: subprocess.run() requires Python 3.5+, but this is from 6 
    years ago (2015), and anything older would be rare to find today.
    Caveat: can't use showRuntimes=ShowRuntimes arg; not yet defined.

    Update: now always adds the '-u' switch of Python (not script) to
    force unbuffered stdout/stderr.  Else, logfiles may be incomplete 
    if inspected while a long-running operation is in progress (e.g., 
    a shared-storage delete of a large __bkp__ folder on the phone).
    diffall has its own '-u' but it's used only for frozen apps/exes.
    -------------------------------------------------------------------
    """
    cmdargs = (sys.executable, '-u') + cmdargs
    if showcmd:
        print('Command:', cmdargs)

    start = time.perf_counter()
    if not logpath:
        subprocess.run(args=cmdargs, input=input)
    else:
        logfile = open(logpath, 'w', encoding='utf8')
        subprocess.run(args=cmdargs, stdout=logfile, input=input)
        logfile.close()

    stop = time.perf_counter()
    if ShowRuntimes: 
        elapsed = stop - start
        print(indent + 'Runtime: %dm, %.3fs' % (elapsed // 60, elapsed % 60))

    if logpath and tailing:
        lines, message = tailing
        lastfew = tail(logpath, lines, message)
        return lastfew
    else:
        return elapsed



def timefunc(func):
    """
    -------------------------------------------------------------------
    Time any function call; could be a decorator, but also for lambdas.
    -------------------------------------------------------------------
    """
    start = time.perf_counter()
    func()
    stop = time.perf_counter()
    if ShowRuntimes: 
        elapsed = stop - start
        print(indent + 'Runtime: %dm, %.3fs' % (elapsed // 60, elapsed % 60))



def rmtreeworkaround(function, fullpath, exc_info):
    """
    -------------------------------------------------------------------
    Catch and try to recover from failures in Py shutil.rmtree() folder
    removal tool, which calls this function automatically on all system
    errors.  Raising an exception here makes shutil.rmtree() raise too.

    This is adapted from Mergeall, which documents it in full.  In short,
    deletions are not always atomic on Windows; macOS may remove "._*"
    Apple-double files auto, and Windows may remove associated folders 
    auto; and other failures may stem from permissions or in-use locks.
    -------------------------------------------------------------------
    """
    # Windows only, directory deletes only
    if RunningOnWindows and function == os.rmdir:

        # make not-found case explicit
        if exc_info[0] == FileNotFoundError:
            msg = '...ignored FileNotFoundError for Windows dir'
            print(msg, fullpath)
            return                                   # folder deleted with file: proceed                               

        # else wait for pending deletes of contents
        else:                                        # nit: need to try iff ENOTEMPTY
            timeout = 0.001
            while timeout < 1.0:                     # 10 tries only, increasing delays
                print('...retrying rmdir')           # set off, but not just for pruning
                try:
                    os.rmdir(fullpath)               # rerun the failed delete (post FWP!)
                except os.error as exc:
                    if exc.errno == errno.ENOENT:    # no such file (not-empty=ENOTEMPTY) 
                        return                       # it's now gone: proceed with rmtree
                    else:
                        time.sleep(timeout)          # wait for a fraction of second (.001=1 msec)
                        timeout *= 2                 # and try again, with longer delay
                else:
                    return                           # it's now gone: proceed with rmtree

    # ignore file-not-found for Mac AppleDouble files
    if RunningOnMac and exc_info[0] == FileNotFoundError:
        itemname = os.path.basename(fullpath)
        if itemname.startswith('._'):
            # assume removed by Mac, or other
            print('...ignored FileNotFoundError for AppleDouble', fullpath)
            return

    raise  # all other cases, or wait loop end: reraise exception to kill rmtree caller



def driveRootPath(path):
    r"""
    -------------------------------------------------------------------
    Given a path, return its prefix which identifies the root of the 
    drive on which it resides.  This is used to ensure that zipfiles 
    are stored on the USB drive root, regardless of user TO settings.

    This keeps chopping off the path's tail item until a mount point
    is found, or empty or no more progress.  It may suffice to call 
    os.path.splitdrive() on Windows, but that's a no-op on POSIX.
    -------------------------------------------------------------------
    macOS:
    >>> driveRootPath('/Users/me/MY-STUFF/Code')
    '/'
    >>> driveRootPath('/Volumes/SAMSUNG_T5/fold3-vs-deltas')
    '/Volumes/SAMSUNG_T5'
    >>> driveRootPath('.')
    '/'
    -------------------------------------------------------------------
    Windows:
    >>> driveRootPath(r'C:\Users\me\Desktop\MY-STUFF')
    'C:\\'
    >>> driveRootPath('D:\\test-ads\\z-fold-3.txt')
    'D:\\'
    >>> driveRootPath('.')
    'C:\\'
    -------------------------------------------------------------------
    Linux:
    >>> driveRootPath('/home/me/Download/fold3-vs-deltas')
    '/'
    >>> driveRootPath('/media/me/SAMSUNG_T5/fold3-vs-deltas')
    '/media/me/SAMSUNG_T5'
    >>> driveRootPath('.') 
    '/'
    -------------------------------------------------------------------
    """
    path = os.path.abspath(path)      # normalize, use \ on win
    next = path
    while next:
        if os.path.ismount(next):     # found the root path
            return next
        else:
            head, tail = os.path.split(next)
            if head == next:
                return path           # give up: don't loop
            else:
                next = head           # go to next path prefix
    return path                       # give up: no path left



[Home page] Books Code Blog Python Author Training Search ©M.Lutz