File: android-deltas-scripts/common.py
""" ======================================================================= Code used by scripts run on both PC and phone. See _README.html for license, attribution, version, and docs. Dev notes: 1) This file is loaded by import, and expects callers to immediately run its startup(), passing in a config_{pc, phone} module which has global settings copied and used here. This is due to its Bash heritage; Py scripts here are more-or-less direct translations for portability. 2) In config_*.py flies, set PauseSteps=n to avoid having to press Enter here, and set CheckFilenames=n to avoid name-fixer interactions here. 3) You can also change ZIP here to point to a newer installed ziptools, but don't generally need to; the version in Mergeall-source/ suffices. ======================================================================= """ import os, time, sys, shutil, subprocess from os.path import join VERSION = 1.1 # Linux also means WSL on Windows, and Android; # Cygwin is Windows, but using its own Python RunningOnWindows = sys.platform.startswith('win') # may be run by Cygwin RunningOnMacOS = sys.platform.startswith('darwin') RunningOnLinux = sys.platform.startswith('linux') # includes WSL, Android RunningOnCygwin = sys.platform.startswith('cygwin') # Cygwin's own python RunningOnAndroid = any(key for key in os.environ if key.startswith('ANDROID_')) # common indent width indent = ' ' * 4 # catch control-c in input() and exit nicely builtin_input = input def input(prompt=''): try: return builtin_input(prompt + ' ') except KeyboardInterrupt: print('\nRun aborted by control-c at prompt.') shutdown(bad=True) import builtins builtins.input = input # default to version here everywhere #---------------------------------------------------------------------- # Setup: globals, logs dir, etc. #---------------------------------------------------------------------- def startup(config, self): """ ------------------------------------------------------------------- Common startup code, called with either the PC or phone config module, and the module containing this function. This copies the config module's names into this module's namespace, and adds two more names here--and all of these names become globals both here and in the client scripts. Hackish, maybe, but globals are denoted by all uppercase (mostly), and this is better than the original Bash translation, which ran this file's all-top-level code in-line in clients' namespaces to emulate a Bash 'source <file>': from config_pc import * mydir = os.path.dirname(__file__) exec(open(join(mydir, 'common.py')).read()) This worked until the post hook script did the same thing and broke outputs. Some globals may be avoided by passing args to utilities, but this module's namespace avoids extra args (~6 in some cases). ------------------------------------------------------------------- """ global ZIP, DATE # set here! print('Android Deltas Scripts %.1f' % VERSION) # copy pc-or-phone config settings to my namespace for name in dir(config): if not name.startswith('__'): setattr(self, name, getattr(config, name)) # check that main common vars are set before tweaking for config in ('LOGS', 'MALL', 'FROM', 'TO', 'STUFF'): try: getattr(self, config) except: print('Config "%s" is not set' % config) print('Exiting; please check the config file and rerun.') shutdown(bad=True) # expand user home (~, ~user) and env vars ($var, ${var}) in paths expander = lambda path: os.path.expandvars(os.path.expanduser(path)) for config in ('LOGS', 'MALL', 'FROM', 'TO'): setattr(self, config, expander(getattr(self, config))) # path checks in all scripts; STUFF is also checked in some scripts for config in ('MALL', 'FROM', 'TO'): setting = getattr(self, config) if not os.path.isdir(setting): print('The %s path does not exist: "%s"' % (config, setting)) print('Exiting; please check the config file and rerun.') shutdown(bad=True) # ensure logfiles folder in all scripts if not os.path.isdir(LOGS): if os.path.exists(LOGS): os.remove(LOGS) os.mkdir(LOGS) # fix Windows non-ASCII prints when 'script > file' (cygwin too?) if RunningOnWindows: os.environ['PYTHONIOENCODING'] = 'utf8' # subprocs inherit # ziptools source-code folder (included in Mergeall) ZIP = join(MALL, 'test', 'ziptools') # logfile date stamp now = time.localtime() DATE = time.strftime('%y-%m-%d', now) # e.g., 21-09-30 for Sep 30, 2021 (sortable) #---------------------------------------------------------------------- # Utilities #---------------------------------------------------------------------- def announce(steplabel): """ ------------------------------------------------------------------- A conditional start-of-step display: pause for enter if enabled. ------------------------------------------------------------------- """ print('\nSTART:', steplabel) if PauseSteps: try: builtin_input('Press enter/return to continue') except KeyboardInterrupt: print('\nRun aborted by control-c before step.') shutdown() def opener(message): """ ------------------------------------------------------------------- Display opening message, save run start time for use in closer(). Package name+version are shown in startup(), before error tests. ------------------------------------------------------------------- """ global RunStartTime RunStartTime = time.perf_counter() # relative seconds print('\n' + message) def closer(message): """ ------------------------------------------------------------------- Normal exit: show total elapsed time (all steps + user time), and pause for an Enter press to allow output to be read if a PC-side script was run on Windows by an icon click (see shutdown() docs). ------------------------------------------------------------------- """ global RunStartTime elapsed = time.perf_counter() - RunStartTime # float seconds h, m, s = (elapsed // 3600), ((elapsed % 3600) // 60), (elapsed % 60) print('\nFinished - total elapsed time (h:m:s) = %d:%d:%.0f' % (h, m, s)) print(message) shutdown() def shutdown(bad=False): """ ------------------------------------------------------------------- All exits: pause for an Enter press iff a PC-side script was run on Windows by an icon click, else the console window is closed before its output can be read. Then, end the script run now with shell status 1 if bad (errors), or 0 if not bad (good/normal). Caveat: as coded, this may require an Enter in some other Windows contexts besides icon clicks, unless streams are redirected. Its PROMPT test suffices to keep click windows up, and does not require an Enter in Command Prompt, Cygwin, or some IDEs (e.g., PyEdit). But it does require an Enter in both PowerShell and the IDLE IDE. The only way to do better seems sniffing parent processes, and this is too complex and brittle. There really should be a better way to detect this very common Windows use case by now... Update: Termux:Widget launches similarly erase the console window on script exit (and open a different Termux session). To support this, test a ForceKeyToClose config-file setting on phone. This is preset to False; Termux:Widget is nice, but likely rare in the wild. This config also works on PCs, but is not needed for Windows clicks. See also _etc/termux-widget-shims/ for wrappers that make this moot. ------------------------------------------------------------------- """ if (ForceKeyToClose or # Termux:Widget? (RunningOnWindows and # Windows + sys.stdout.isatty() and # console output + sys.stdin.isatty() and # console input + 'PROMPT' not in os.environ)): # clicked only? (maybe) builtin_input('Press enter/return to exit') sys.exit(1 if bad else 0) def offerFixnamesRun(loglabel): """ ------------------------------------------------------------------- On PC, ask to run the nonportable-filenames fixer, in both initial copies and syncs. This is nearly required on Linux and macOS for interoperability with Windows, and FAT32, exFAT, and BDR drives. But don't ask on Windows: Windows disallows nonportables globally, and Cygwin's munged nonportables register as the munges instead of the nonportables if it's using Windows Python. Cygwin's own Python does translates munges to nonportables, but also sets sys.platform to 'cygwin' so it's not RunningOnWindows. WSL munges too, but its sys.platform is 'linux'. See _etc/examples/windows*/x* for demos. For convenience, this scrapes the fixer's last two report lines as a summary, and pulls out the number-found at the end of the last. This is both brittle and error prone, and requires forging user input for the script's verify, but reduces output and interaction. ------------------------------------------------------------------- """ if CheckFilenames and not RunningOnWindows: print('\n' 'It is recommended to run fix-nonportable-filenames.py before\n' 'propagating content from Unix to Windows, some Android\'s shared\n' 'storage, and proxy drives using the FAT32 or exFAT filesystems.') fixer = join(MALL, 'fix-nonportable-filenames.py') logto = join(LOGS, '%s--%s-fixnames-log.txt' % (DATE, loglabel)) userreply = input('Run the name-fixer script in report-only mode (y or n)?') if userreply == 'y': print('Running name fixer') lasttwo = runpy(fixer, join(FROM, STUFF), '-', input=b'y\n', logpath=logto, tailing=(2, 'Name-fixer summary')) # end interaction now if no lines to change numfound = int(lasttwo[-1].split(':')[1].strip()) if numfound == 0: print('No names to change') return userreply = input('Display name-fixer\'s full output (y or n)?') if userreply == 'y': print(open(logto, 'r', encoding='utf8').read()) userreply = input('Run the name-fixer script to fix filenames (y or n)?') if userreply != 'y': print('Name-fixer script not run') else: print('Running name fixer') runpy(fixer, join(FROM, STUFF), input=b'y\n', logpath=logto, tailing=(2, 'Name-fixer summary')) def verifyPCtoProxy(loglabelM, loglabelD): """ ------------------------------------------------------------------- On PC, verify that PC and proxy-drive content is the same. This is called for both initial copies and content syncs. It asks to run mergeall and diffall separately: the former is fast, but the latter is slow for larger content collections. Similar code in verify*part2 couldn't be factored into this. ------------------------------------------------------------------- """ if VerifyProxy: userreply = input('\nVerify PC copy to proxy copy with mergeall (y or n)?') if userreply == 'y': print('Running mergeall') logto = join(LOGS, '%s--%s-verify-mergeall-log.txt' % (DATE, loglabelM)) runpy(join(MALL, 'mergeall.py'), join(FROM, STUFF), join(TO, STUFF), '-report', '-skipcruft', '-quiet', logpath=logto, tailing=(9, 'Mergeall summary')) userreply = input('\nVerify PC copy to proxy copy with diffall (y or n)?') if userreply == 'y': print('Running diffall') logto = join(LOGS, '%s--%s-verify-diffall-log.txt' % (DATE, loglabelD)) runpy(join(MALL, 'diffall.py'), join(FROM, STUFF), join(TO, STUFF), '-skipcruft', '-quiet', logpath=logto, tailing=(7, 'Diffall summary')) # 6=none, 7=1 unique (__bkp__) def previewChanges(loglabel): """ ------------------------------------------------------------------- On PC, show PC~proxy deltas before saving or propagating, for sync runs only. Like offerFixnamesRun(), this routes output to a file and scrapes its final report, to reduce output in the console. ------------------------------------------------------------------- """ if PreviewSyncs: userreply = input('\nPreview changes in the source tree (y or n)?') if userreply == 'y': print('Running preview of changes to be synced') logto = join(LOGS, '%s--%s-preview-log.txt' % (DATE, loglabel)) runpy(join(MALL, 'mergeall.py'), join(FROM, STUFF), join(TO, STUFF), '-report', '-skipcruft', '-quiet', logpath=logto, tailing=(9, 'Preview summary')) userreply = input('Display preview\'s full output (y or n)?') if userreply == 'y': print(open(logto, 'r', encoding='utf8').read()) userreply = input('Continue with sync (y or n)?') if userreply != 'y': print('Sync aborted.') shutdown() def removeExistingOrStop(thedst, where): """ ------------------------------------------------------------------- In both initial-copy scripts, get the user's okay and then delete existing content trees on proxy or phone; shutdown if not approved. Also used in the phone-sync script before unzipping, in case a DELTAS folder is present from a prior-sync abort or other source. deltas.py always removes an existing DELTAS folder before starting. ------------------------------------------------------------------- """ if os.path.exists(thedst): print('Removing prior content on', where) userreply = input('Proceed with removal (y or n)?') if userreply != 'y': print('Run stopped for existing content.') shutdown() else: timefunc(lambda: shutil.rmtree(thedst, onerror=rmtreeworkaround)) print('Starting unzip') def moveZipToProxy(thezip, prxzip): """ ------------------------------------------------------------------- Run a copy+delete across filesystems. This is used by PC scripts for both the initial copy's full zip, and the sync's deltas zip. prxzip is already a file in driveRootPath(); run early for messages. ------------------------------------------------------------------- """ try: if os.path.exists(prxzip): os.remove(prxzip) shutil.copy2(thezip, prxzip) os.remove(thezip) except Exception as E: print('Zip move failed - Python exception:', E) print('Shutting down; check permissions and space and try again.') shutdown(bad=True) def tail(filename, lines, message): """ ------------------------------------------------------------------- A portable 'tail -n lines filename': show the last 'lines' lines in file 'filename'. This is inefficient for large files as coded, but seek-and-scan is complex, and memory use is not a concern here. ------------------------------------------------------------------- """ print(message + ':') lastfew = open(filename, 'r', encoding='utf8').readlines()[-lines:] for line in lastfew: print(indent + line.rstrip()) return lastfew # tail lines for scraping def runpy(*cmdargs, logpath=None, input=None, showcmd=False, tailing=None): """ ------------------------------------------------------------------- Run a Python command line, optionally sending stdout to a file named in logpath, and providing stdin text from bytes input. *cmdargs is individual words in the command line to run; the first is the pathname of the Python script to run. This adds the host Python's path to the front of *cmdargs automatically. tailing can be used iff logpath is a filename. It is a tuple (lines, message) whose items are passed to the tail() function along with the logpath name. When used, the return value is the result of tail(); else, the return value is elapsed time. Used on both PC and phone, portable to Win, Mac, Linux, Android. Caveat: subprocess.run() requires Python 3.5+, but this is from 6 years ago (2015), and anything older would be rare to find today. Caveat: can't use showRuntimes=ShowRuntimes arg; not yet defined. Update: now always adds the '-u' switch of Python (not script) to force unbuffered stdout/stderr. Else, logfiles may be incomplete if inspected while a long-running operation is in progress (e.g., a shared-storage delete of a large __bkp__ folder on the phone). diffall has its own '-u' but it's used only for frozen apps/exes. ------------------------------------------------------------------- """ cmdargs = (sys.executable, '-u') + cmdargs if showcmd: print('Command:', cmdargs) start = time.perf_counter() if not logpath: subprocess.run(args=cmdargs, input=input) else: logfile = open(logpath, 'w', encoding='utf8') subprocess.run(args=cmdargs, stdout=logfile, input=input) logfile.close() stop = time.perf_counter() if ShowRuntimes: elapsed = stop - start print(indent + 'Runtime: %dm, %.3fs' % (elapsed // 60, elapsed % 60)) if logpath and tailing: lines, message = tailing lastfew = tail(logpath, lines, message) return lastfew else: return elapsed def timefunc(func): """ ------------------------------------------------------------------- Time any function call; could be a decorator, but also for lambdas. ------------------------------------------------------------------- """ start = time.perf_counter() func() stop = time.perf_counter() if ShowRuntimes: elapsed = stop - start print(indent + 'Runtime: %dm, %.3fs' % (elapsed // 60, elapsed % 60)) def rmtreeworkaround(function, fullpath, exc_info): """ ------------------------------------------------------------------- Catch and try to recover from failures in Py shutil.rmtree() folder removal tool, which calls this function automatically on all system errors. Raising an exception here makes shutil.rmtree() raise too. This is adapted from Mergeall, which documents it in full. In short, deletions are not always atomic on Windows; macOS may remove "._*" Apple-double files auto, and Windows may remove associated folders auto; and other failures may stem from permissions or in-use locks. ------------------------------------------------------------------- """ # Windows only, directory deletes only if RunningOnWindows and function == os.rmdir: # make not-found case explicit if exc_info[0] == FileNotFoundError: msg = '...ignored FileNotFoundError for Windows dir' print(msg, fullpath) return # folder deleted with file: proceed # else wait for pending deletes of contents else: # nit: need to try iff ENOTEMPTY timeout = 0.001 while timeout < 1.0: # 10 tries only, increasing delays print('...retrying rmdir') # set off, but not just for pruning try: os.rmdir(fullpath) # rerun the failed delete (post FWP!) except os.error as exc: if exc.errno == errno.ENOENT: # no such file (not-empty=ENOTEMPTY) return # it's now gone: proceed with rmtree else: time.sleep(timeout) # wait for a fraction of second (.001=1 msec) timeout *= 2 # and try again, with longer delay else: return # it's now gone: proceed with rmtree # ignore file-not-found for Mac AppleDouble files if RunningOnMac and exc_info[0] == FileNotFoundError: itemname = os.path.basename(fullpath) if itemname.startswith('._'): # assume removed by Mac, or other print('...ignored FileNotFoundError for AppleDouble', fullpath) return raise # all other cases, or wait loop end: reraise exception to kill rmtree caller def driveRootPath(path): r""" ------------------------------------------------------------------- Given a path, return its prefix which identifies the root of the drive on which it resides. This is used to ensure that zipfiles are stored on the USB drive root, regardless of user TO settings. This keeps chopping off the path's tail item until a mount point is found, or empty or no more progress. It may suffice to call os.path.splitdrive() on Windows, but that's a no-op on POSIX. ------------------------------------------------------------------- macOS: >>> driveRootPath('/Users/me/MY-STUFF/Code') '/' >>> driveRootPath('/Volumes/SAMSUNG_T5/fold3-vs-deltas') '/Volumes/SAMSUNG_T5' >>> driveRootPath('.') '/' ------------------------------------------------------------------- Windows: >>> driveRootPath(r'C:\Users\me\Desktop\MY-STUFF') 'C:\\' >>> driveRootPath('D:\\test-ads\\z-fold-3.txt') 'D:\\' >>> driveRootPath('.') 'C:\\' ------------------------------------------------------------------- Linux: >>> driveRootPath('/home/me/Download/fold3-vs-deltas') '/' >>> driveRootPath('/media/me/SAMSUNG_T5/fold3-vs-deltas') '/media/me/SAMSUNG_T5' >>> driveRootPath('.') '/' ------------------------------------------------------------------- """ path = os.path.abspath(path) # normalize, use \ on win next = path while next: if os.path.ismount(next): # found the root path return next else: head, tail = os.path.split(next) if head == next: return path # give up: don't loop else: next = head # go to next path prefix return path # give up: no path left