#!/usr/bin/python # Process lines from stdin with python functions # Functions are either predefined or user supplied, # the latter being similar to perl -n -e 'expression'. # Update June 9 2009, I noticed a similar script called pyline: # http://code.activestate.com/recipes/437932/ # Update March 23 2012, Another similar tool called pyp # http://code.google.com/p/pyp/ # Update May 24 2012, Another similar tool called osh # http://geophile.com/osh/ # Update Sep 2015, Another similar tool called red (regexp) # https://bitbucket.org/johannestaas/red # License: LGPLv2 # Author: # http://www.pixelbeat.org/ # Notes: # This util uses python's functional attributes to be: # 1. scalable (don't build lists in mem so arbitrary length input supported) # 2. efficient (as many operations done in compiled code as possible) # Changes: # V1.0, 01 Nov 2006, Initial release # V1.1, 30 Nov 2007, Added 'uniq' (example of many to many) # V1.6, 11 Jul 2011, http://github.com/pixelb/scripts/commits/master/scripts/funcpy # Examples: # # add numbers # echo -e "1234\n433" | ./funcpy add # # The rest of the examples use user supplied expressions # (where the input line is represented by x) # # transform numbers # echo -e "1234\n433" | ./funcpy "hex(int(x))[2:]" # # operate on single column # echo "1,2,3" | ./funcpy "int(x.split(',')[1])**2" # # reiterate over sub items (only reducable functions) # echo "1,2,3" | ./funcpy "add(int(i) for i in x.split(','))" # # swartzian transform (to sort by line len) (note sort not as efficient or scalable as max) # ls | ./funcpy "str(len(x))+'\t'+x" | sort -k1,1n | cut -f2 # # filtering input (odd numbers) (note "or None" at end) # echo -e "1\n2\n3" | ./funcpy "int(x)%2 and x or None" # # formating with printf # echo "12341234.432142" | ./funcpy '"%g" % float(x)' # # arbitrary modules # ls | ./funcpy --import=base64 "base64.encodestring(x)[:-1]" # # similar to perl # echo -e "1\n5" | perl -n -e 'print $_**2,"\n"' import itertools import operator import math import sys used_compiled=True #Turn off for comparison #is swartzian transform functional? #can we add reducable decorator to functions? #TODO: allow option=expression to filter input if true rather than and or hack? #TODO: put human() in here? def reduce(func, items, start=None): #foldl try: items=iter(items) except: raise TypeError("reduce() arg 2 must support iteration") if start==None: try: state=items.next() except StopIteration: raise TypeError("reduce() of empty sequence with no initial value") else: state=start for i in items: state = func(state,i) return state if used_compiled: try: #builtin removed in python 3.0 reduce=__builtins__.reduce except: #here in python 2.6, only here in python 3.0 import functools reduce=functools.reduce def imap(func, iter): for i in iter: yield func(i) if used_compiled: if sys.version_info < (3,): imap=itertools.imap else: imap=map def izip(iter1, iter2): for i in iter1: yield (i, iter2.next()) #exception raised if either exhausted as required if used_compiled: if sys.version_info < (3,): izip=itertools.izip else: izip=zip def irange(*args): start=0 step=1 if len(args) == 0: raise TypeError("irange expected at least 1 argument, got 0") elif len(args) == 1: (stop,) = args elif len(args) == 2: (start,stop) = args elif len(args) == 3: (start,stop,step) = args else: raise TypeError("irange expected at most 3 arguments, got %d" % len(args)) while start < stop: yield start start += step if used_compiled: if sys.version_info < (3,): irange=xrange else: irange=range def all(iter): #every? return False not in imap(bool,iter) def any(iter): #some? return True in imap(bool,iter) if used_compiled: try: # any() and all() are available since python 2.5 all=__builtins__.all any=__builtins__.any except: pass def factorial(x): return reduce(operator.mul, irange(1,int(x)+1)) def add(iter): return reduce(operator.add, iter) if used_compiled: add=__builtins__.sum #should be slightly more specific (faster) def head(iterator, n=10): return (x for x,i in izip(iterator, irange(n))) # Note the following more general inspector # return itertools.islice(iterator,n) #def tail(iterator, n=10): # import collections # return collections.deque(iterator, maxlen=n) def count(iterator): return add(1 for x in iterator) def uniq(iterator): state=None for line in iterator: if line != state: yield line state=line # Using the groupby equivalent is slightly slower: # return (k for k, g in itertools.groupby(iterator)) def uniq_c(iterator): return ("%7d %s" % (count(g), k) for k, g in itertools.groupby(iterator)) def uniq_d(iterator): return (k for k, g in itertools.groupby(iterator) if count(head(g,2)) > 1) def min(iter): state=None for i in iter: if state > i: state=i return state if used_compiled: min=__builtins__.min def max(iter): state=None for i in iter: if i > state: state=i return state if used_compiled: max=__builtins__.max def avg(iterator): try: # Note math.fsum (available since 2.6) doesn't lose precision # while adding lots of small numbers iter1, iter2 = itertools.tee(iterator) return math.fsum(iter1)/count(iter2) except: total=0 lenth=0 for i in iterator: total+=i length+=1 return total/length def dec2hex(i): return __builtins__.hex(int(i))[2:] def hex2dec(i): return int(i,16) def bits(i): if i<0: raise ValueError("negative number [%d]" % i) if i==0: return 1 #shift operator slower return int(math.log(i,2))+1 #============================================================= import os, errno # The following exits cleanly on Ctrl-C or EPIPE # while treating other exceptions as before. def std_exceptions(etype, value, tb): sys.excepthook=sys.__excepthook__ if issubclass(etype, KeyboardInterrupt): pass elif issubclass(etype, IOError) and value.errno == errno.EPIPE: pass else: sys.__excepthook__(etype, value, tb) sys.excepthook=std_exceptions functions={ #name: (func, input, type) 'add': (add, float, 'mto1'), 'max': (max, float, 'mto1'), 'min': (min, float, 'mto1'), 'avg': (avg, float, 'mto1'), 'all': (all, float, 'mto1'), 'any': (any, float, 'mto1'), 'count': (count, str, 'mto1'), 'head': (head, str, 'mtom'), 'uniq': (uniq, str, 'mtom'), 'uniq_c': (uniq_c, str, 'mtom'), 'uniq_d': (uniq_d, str, 'mtom'), #Following really of minimal use #'dec2hex': (dec2hex, str, '1to1'), #'hex2dec': (hex2dec, str, '1to1'), #'factorial': (factorial, int, '1to1'), #'log10': (math.log10, float, '1to1'), #'bits': (bits, long, '1to1'), } def Usage(): sys.stderr.write("Usage: " + os.path.split(sys.argv[0])[1] + " [OPTIONS] <" + '|'.join(sorted(functions.keys())) + ">\n") sys.stderr.write(" " + os.path.split(sys.argv[0])[1] + " [OPTIONS] <" + "python expression where input is x>\n") sys.stderr.write("Input is taken from stdin, and is processed per line\n") sys.stderr.write("Note python expressions must be stateless (no ifs or assigments)\n") sys.stderr.write(" --imports=mod1,mod2,... modules to import for use in python expression\n") sys.stderr.write(" --help display this help\n") sys.stderr.close() sys.exit(1) import getopt try: lOpts, lArgs = getopt.getopt(sys.argv[1:], "", ["help","imports="]) if len(lArgs) == 1: mode=lArgs[0] else: Usage() sys.exit(1) if ("--help","") in lOpts: Usage() sys.exit(None) for opt in lOpts: if opt[0] == "--imports": for mod in opt[1].split(","): globals()[mod]=__import__(mod) except getopt.error: msg = sys.exc_info()[1] sys.stdout.write("%s\n" % msg) Usage() sys.exit(2) #============================================================= try: func=functions[mode][0] conv=functions[mode][1] ftype=functions[mode][2] except: #shows the flexibility of interpretation over compilation code=compile(mode,"","eval") func=lambda x: eval(code) conv=lambda x: x[-1]=='\n' and x[:-1] or x #strip EOL ftype='1to1' def convert(iterator): if conv==str: #shortcut for speed return iterator else: return (conv(line) for line in iterator) try: if ftype=='mto1': #many to one result = func(convert(sys.stdin)) if type(result)==bool: sys.exit(not result) else: sys.stdout.write("%s\n" % result) elif ftype=='mtom': #many to many #for result in func(convert(sys.stdin)): # sys.stdout.write(result) #note the following is faster but it buffers output #even if stdout is a terminal. writelines really shouldn't do that. sys.stdout.writelines(func(convert(sys.stdin))) else: #one to one for num in convert(sys.stdin): result = func(num) if result!=None: sys.stdout.write("%s\n" % result) except ValueError: value = sys.exc_info()[1] sys.stderr.write(str(value)+"\n") sys.exit(1)