Source code for coilpy.sortedDict

# file processed by 2to3
from __future__ import print_function, absolute_import
from builtins import map, filter, range

import numpy
import pickle as pypickle
import sys
if sys.version_info < (3,0):
    import cPickle as pickle
else:
    import pickle
import functools
import ast
import filecmp
import types
import shutil
import copy
import difflib
import re
import os
import pprint

__all__ = [
    'hide_ptrn', 'private_ptrn', 'comment_ptrn', 'comment_ptrn_in_brackets', 'number_ptrn',
    'sortHuman', 'get_bases',
    'parseBuildLocation', 'parseLocation', 'traverseLocation', 'buildLocation', 'setLocation',
    'traverse', 'treeLocation',
    'recursiveUpdate',
    'pretty_diff', 'prune_mask',
    'dynaLoad', 'dynaLoadKey', 'dynaSave',
    'SortedDict', 'pickle', 'size_tree_objects'
]

# Useful patterns
hide_ptrn=re.compile(r'^__.*__$')
private_ptrn=re.compile(r'^_.*[^_]+$')
comment_ptrn=re.compile(r'^__comment.*__$')
comment_ptrn_in_brackets=re.compile(r'''.*\[['"]__comment.*__['"]\].*''')
number_ptrn=re.compile(r"[-+]?\d*\.?\d+[eEdD][-+\d]+|[-+\d]+\.\d+|\d+")

_special1=[]




def isinstance_str(inv, cls):
    '''
    checks if an object is of a certain type by looking at the class name (not the class object)
    This is useful to circumvent the need to load import Python modules.

    :param inv: object of which to check the class

    :param cls: string or list of string with the name of the class(es) to be checked

    :return: True/False
    '''
    if isinstance(cls, str):
        cls = [cls]
    if hasattr(inv, '__class__') and hasattr(inv.__class__, '__name__') and inv.__class__.__name__ in cls:
        return True
    return False


#---------------------
# Delete leading zeros
#---------------------
def delete_leading_zeros(number_str):
    '''
    Delete leading zeros from array indices

    :param number_str: A string containing comma separated dimension indices and colon separated ranges and strides, such as '008:010,0001:0002'

    :return: String with leading zeros stripped, such as '8:10,1:2'
    '''
    return ','.join([':'.join([colon_split.lstrip('0') if colon_split.lstrip('0') else '0'
                                  for colon_split in comma_split.split(':')])
                                        for comma_split in number_str.split(',')])

[docs]def sortHuman(inStr): """Sort the given list the way that humans expect""" outStr=str(inStr).lower() tmp=re.findall(number_ptrn,outStr) outStr=re.sub(number_ptrn,'\1',outStr) for kn in tmp: kn=re.sub(r'[dD]','e',kn) try: outStr=re.subn('\1',format(float(kn),"+040.16f"),outStr,1)[0] except ValueError: pass outStr=re.sub('-','m',outStr) outStr=re.sub('\+','p',outStr) return outStr
def _insort(a, x, caseInsensitive=True): lo=0 hi=len(a) while lo < hi: mid = (lo+hi)//2 if caseInsensitive and str(x).lower() < str(a[mid]).lower(): hi = mid elif not caseInsensitive and str(x) < str(a[mid]): hi = mid else: lo = mid+1 a.insert(lo, x)
[docs]def get_bases(clss,tp=[]): "Returns a list of strings describing the dependencies of a class" if tp==[]: tp=[clss.__name__] bases = getattr(clss, '__bases__') if not len(bases): return tp else: for item in bases: tp.append(item.__name__) get_bases(item,tp) return tp
def different(a, b, precision=0.0): ''' Evaluates if two objects are different :param a: first object to compare :param b: second object to compare :param precision: relative precision to which objects are compared :return: integer to indicate equal (0) or different (1) ''' if isinstance_str(a, ['OMFITexpression', 'OMFITiterableExpression']) and isinstance_str(b, ['OMFITexpression', 'OMFITiterableExpression']): if a.expression!=b.expression: return 1 elif a.__class__!=b.__class__: return 1 elif hasattr(a,'filename') and hasattr(b,'filename'): if not os.path.exists(a.filename) or not os.path.exists(b.filename): return 1 elif not filecmp.cmp(a.filename,b.filename): return 1 elif not filecmp.cmp(a.filename,b.filename,shallow=False): return 1 elif os.path.split(a.filename)[1]!=os.path.split(b.filename)[1]: return 1 return 0 else: try: numpy.testing.assert_equal(a,b) return 0 except Exception: if precision==0.0: return 1 try: numpy.testing.assert_allclose(a, b, rtol=precision) return 0 except Exception: return 1 return 0
[docs]def parseLocation(inv): """ Parse string representation of the dictionary path and return list including root name This function can parse things like: OMFIT['asd'][u'aiy' ]["[ 'bla']['asa']"][3][1:5] :param inv: string representation of the dictionary path :return: list of dictionary keys including rootname """ # look for matching dictionary blocks with matching quotes inv = inv.strip() quote = False starts_at = 0 splits = [] k=0 char=None while k<len(inv): if inv[k] not in ' \t': new_char=inv[k] if new_char == '[' and not quote: if inv[k + 1] in ["'", '"']: quote = inv[k + 1] elif inv[k + 1] in 'bru' and inv[k + 2] in ["'", '"']: quote = inv[k + 2] else: quote = True starts_at = k + 1 if inv[k] == ']' and (quote is True or last_char == quote): quote = False splits.append(inv[starts_at:k]) if inv[k] not in ' \t': last_char=inv[k] k=k+1 # if quote was not closed then the parentheses do not match if quote: raise (SyntaxError('Unbalanced parentheses in ' + inv)) # eval splits for k, item in enumerate(splits): try: splits[k] = ast.literal_eval(item) except SyntaxError: if ':' in item: splits[k] = item else: raise return [inv.split('[')[0]] + splits
[docs]def buildLocation(inv): """ Assemble list of keys into dictionary path string :param inv: list of dictionary keys including rootname :return: string representation of the dictionary path """ tmp=inv[0] for item in inv[1:]: if isinstance(item,str): if ':' in item and not re.findall('[a-zA-Z]',item): tmp+='['+item+']' else: tmp+='['+repr(item)+']' else: tmp+='['+repr(item)+']' return tmp
[docs]def setLocation(location, value, globals=None, locals=None): location = parseLocation(location) eval(buildLocation(location[:-1]),globals,locals)[location[-1]] = value return value
[docs]def parseBuildLocation(inv): """ DEPRECATED: use `parseLocation` and `buildLocation` functions instead Function to handle locations in the OMFIT tree (i.e. python dictionaries) :param inv: input location :return: * if `inv` is a string, then the dictionary path is split and a list is returned (Note that this function strips the root name) * if it's a list, then the dictionary path is assembled and a string is returned (Note that this function assumes that the root name is missing) """ if isinstance(inv,str): return parseLocation(inv)[1:] elif isinstance(inv,list): return buildLocation(['']+inv) else: raise(ValueError('parseBuildLocation accepts either a string or a list'))
[docs]def traverseLocation(inv): ''' returns list of locations to reach input location :param inv: string representation of the dictionary path :return: list of locations including rootname to reach input location ''' tmp=parseLocation(inv) return [buildLocation(tmp[:k]) for k in range(1,len(tmp)+1)]
[docs]def traverse(self, string='', level=100, split=True, onlyDict=False, onlyLeaf=False, skipDynaLoad=False, noSubmodules=False, traverse_classes=(dict,)): """ Returns a string or list of strings describing the path of every entry/subentries in the dictionary :param string: string to be appended in front of all entries :param level: maximum depth :param split: split the output string into a list of strings :param onlyDict: return only dictionary entries (can be a tuple of classes) :param onlyLeaf: return only non-dictionary entries (can be a tuple of classes) :param skipDynaLoad: skip entries that have .dynaLoad==True :param noSubmodules: controls whether to traverse submodules or not :param traverse_classes: tuple of classes to traverse :return: string or list of string """ string_in=string string_out='' if isinstance(self,dict): keys=list(self.keys()) elif isinstance(self,(list,tuple)): keys=list(range(len(self))) else: keys=[] for kid in keys: kidName="["+repr(kid)+"]" string=string_in+kidName # skip also expressions when skipDynaLoad if skipDynaLoad and isinstance_str(self[kid], ['OMFITexpression', 'OMFITiterableExpression']): continue # mention this entry according to `onlyDict` and `onlyLeaf` filters if ((not onlyDict and not onlyLeaf) or (onlyDict is True and isinstance(self[kid],traverse_classes)) or (isinstance(onlyDict,tuple) and isinstance(self[kid],onlyDict)) or (onlyLeaf is True and not isinstance(self[kid],traverse_classes)) or (isinstance(onlyLeaf,tuple) and isinstance(self[kid],onlyLeaf)) ): string_out+=string+'\n' # do not go deeper if skipDynaLoad and the file has not been loaded if skipDynaLoad and hasattr(self[kid],'dynaLoad') and self[kid].dynaLoad: continue # go deeper if noSubmodules: from classes.omfit_base import OMFITmodule if isinstance(self[kid],traverse_classes) and (not isinstance(onlyDict,tuple) or isinstance(self[kid],onlyDict)) and level>0 and len(self[kid]) and not (noSubmodules and isinstance(self[kid],OMFITmodule)): level-=1 string_out+=traverse(self[kid],string,level,False,onlyDict,onlyLeaf,skipDynaLoad,noSubmodules,traverse_classes) level+=1 if split: return string_out.strip('\n').split('\n') else: return string_out
[docs]def treeLocation(location, memo=None): if hasattr(location, '_OMFITcopyOf') and location._OMFITcopyOf is not None: location = location._OMFITcopyOf() _nil=[] if memo is None: memo = {} y = memo.get(id(location), _nil) if y is not _nil: return y if not hasattr(location,'_OMFITparent'): try: location._OMFITkeyName='' location._OMFITparent=None except AttributeError: #this is for objects which do not accept ._OMFITparent (e.g. int, None, float,...) #these are only leafs in the tree and do not need their location in the tree #to function anyways. return None if location._OMFITparent is None: #this is when to treat the head node tmp=[location._OMFITkeyName] else: #this is for all of the middle nodes tmp=treeLocation(location._OMFITparent) if tmp is not None: tmp.append(tmp[-1]+location._OMFITkeyName) memo[id(location)] = tmp _keep_alive(location, memo) return tmp
def _keep_alive(x, memo): """Keeps a reference to the object x in the memo. Because we remember objects by their id, we have to assure that possibly temporary objects are kept alive by referencing them. We store a reference at the id of the memo, which should normally not be used unless someone tries to deepcopy the memo itself... """ try: memo[id(memo)].append(x) except KeyError: # aha, this is the first one :-) memo[id(memo)]=[x]
[docs]def recursiveUpdate(A, B, overWrite=True): ''' Recursive update of dictionary A based on data from dictionary B :param A: dictionary A :param B: dictionary B :param overWrite: force overwrite of duplicates :return: ''' def f_traverse(A,B): for kid in list(B.keys()): if isinstance(B[kid],dict): if kid not in A: A[kid]=B[kid].__class__() f_traverse(A[kid],B[kid]) else: if overWrite or kid not in A: A[kid]=copy.deepcopy(B[kid]) f_traverse(A,B)
[docs]def pretty_diff(d,ptrn={}): """ generate "human readable" dictionary output from SortedDict.diff() """ for k in list(d[0].keys()): if isinstance(d[0][k][0],dict): ptrn[k]=SortedDict() pretty_diff(d[0][k],ptrn=ptrn[k]) else: ptrn[k]=d[0][k][0] return ptrn
[docs]def prune_mask(what,ptrn): """ prune dictionary structure based on mask The mask can be in the form of of a `pretty_diff` dictionary or a list of `traverse` strings """ if isinstance(ptrn,dict): for k in list(what.keys()): if k not in list(ptrn.keys()): del what[k] elif isinstance(what[k],dict) and isinstance(ptrn[k],dict): prune_mask(what[k],ptrn[k]) return what elif isinstance(ptrn,(tuple,list)): ptrn=list(ptrn) #disregard non-existent paths for k,item in list(enumerate(ptrn))[::-1]: try: eval('what'+item) except Exception: ptrn.pop(k) #expand subtrees for item in list(ptrn): if isinstance(eval('what'+item),SortedDict): ptrn.extend([item+x for x in eval('what'+item).traverse()]) #expand add parents ptrn=set(ptrn) for item in list(ptrn): rootName=[] for level in parseLocation(item)[:-1]: rootName=rootName+[level] ptrn.add(buildLocation(rootName)) #do the pruning for item in what.traverse(): if item not in ptrn: try: exec('del what'+item, globals(), locals()) except Exception: pass return what else: raise(Exception('prune_mask: only list/tuple/dict supported'))
[docs]def size_tree_objects(location): """ Returns file sizes of objects in the dictionary based on the size of their filename attribute :param location: string of the tree location to be analyzed :return: dictionary with locations sorted by size """ tmp=traverse(eval(location),onlyDict=False,skipDynaLoad=True) sizes={} for item in tmp: if hasattr(eval(location+item),'filename'): try: obj=eval(location+item) if not obj.filename: continue size=os.stat(obj.filename).st_size if size not in sizes: sizes[size]=[] sizes[size].append(location+item) except Exception as _excp: print('Error sizing object %s : %s'%(location+item,repr(_excp))) return sizes
[docs]def dynaLoad(f): ''' Decorator which calls `_dynaLoad` method :param f: function to decorate :return: decorated function ''' @functools.wraps(f) def dynamicLoading(self, *args, **kw): self._dynaLoad(f) return f(self, *args, **kw) return dynamicLoading
[docs]def dynaLoadKey(f): ''' Decorator which calls `_dynaLoad` method only if key is not found :param f: function to decorate :return: decorated function ''' @functools.wraps(f) def dynamicLoading(self, *args, **kw): if args[0] not in self.keyOrder: self._dynaLoad(f) return f(self, *args, **kw) return dynamicLoading
[docs]def dynaSave(f): ''' Decorator which calls `_dynaSave` method :param f: function to decorate :return: decorated function ''' def doNothing(): pass @functools.wraps(f) def dynamicSaving(self,*args,**kw): if hasattr(self,'readOnly') and self.readOnly: if hasattr(self,'modifyOriginal'): if self.modifyOriginal and os.path.exists(self.filename) and os.path.samefile(self.link,self.filename): return doNothing elif hasattr(self,'_save_by_copy'): return self._save_by_copy(**kw) if self._dynaSave(): return doNothing return f(self, *args, **kw) return dynamicSaving
def _docFromDict(f): ''' Use the same docstring as for dict :param f: function to decorate :return: decorated function ''' try: f.__doc__=getattr(dict,f.__name__).__doc__ except AttributeError: pass return f
[docs]class SortedDict(dict): # originally inspired from django/trunk/django/utils/datastructures.py @ 17464 ''' A dictionary that keeps its keys in the order in which they're inserted :param data: A dict object or list of (key,value) tuples from which to initialize the new SortedDict object :param \**kw: Optional keyword arguments given below kw: :param caseInsensitive: (bool) If True, allows self['UPPER'] to yield self['upper']. :param sorted: (bool) If True, keep keys sorted alphabetically, instead of by insertion order. :param limit: (int) keep only the latest `limit` number of entries (useful for data cashes) :param dynaLoad: (bool) Not sure what this does ''' def __new__(cls, *args, **kwargs): instance = super(SortedDict, cls).__new__(cls, *args, **kwargs) instance.keyOrder = [] instance._OMFITkeyName='' instance._OMFITparent=None instance.caseInsensitive=kwargs.pop('caseInsensitive',False) instance.sorted=kwargs.pop('sorted',False) instance.limit=kwargs.pop('limit',0) instance.dynaLoad=False return instance def __init__(self, data=None, *args, **kwargs): self.keyOrder = [] self._OMFITkeyName='' self._OMFITparent=None self.caseInsensitive=kwargs.pop('caseInsensitive',False) self.sorted=kwargs.pop('sorted',False) self.limit=kwargs.pop('limit',0) self.dynaLoad=False if data is None: data = {} elif isinstance(data, types.GeneratorType): # Unfortunately we need to be able to read a generator twice. Once # to get the data into self with our super().__init__ call and a # second time to setup keyOrder correctly data = list(data) if isinstance(data, dict): for key in list(data.keys()): self[key]=data[key] else: for key, value in data: self[key]=value if self.sorted: self.sort() self.dynaLoad=kwargs.pop('dynaLoad',False) def _dynaLoad(self,f=None): ''' call `load` function if object has `dynaLoad` attribute set to True after calling `load` function the `dynaLoad` attribute is set to False :return: ''' if self.dynaLoad: self.dynaLoad=False if f is None or f.__name__!='load': try: # import sys,traceback # filename=str(id(self)) # if hasattr(self,'filename'): # filename=self.filename # print('',file=sys.__stderr__) # print('--------------------',file=sys.__stderr__) # print('Dynamic load '+os.path.split(filename)[1]+'\t\t(%s)'%str(f),file=sys.__stderr__) # print('--------------------',file=sys.__stderr__) # traceback.print_stack(file=sys.__stderr__) return self.load() except Exception as _excp_load: # If an error occurs during loading # Clear and reset the dynamic load switch to allow re-tries # note: errors could occur because user stops the process try: self.clear() except Exception as _excp_clear: # if clear() fails, then its exception should be printd but not raised. # the user is interested in the original exception raised by load(). print('The clear() method raised an exception: '+repr(_excp_clear)) self.dynaLoad=True raise _excp_load return None def _dynaSave(self): """ This function is meant to be called in the .save() function of objects of the class `OMFITobject` that support dynamic loading. The idea is that if an object has not been loaded, then its file representation has not changed and the original file can be resued. This function returns True/False to say if it was successful at saving. If True, then the original .save() function can return, otherwise it should go through saving the data from memory to file. """ if self.dynaLoad and hasattr(self,'link') and hasattr(self,'filename'): try: print('Dynamic save: '+self.filename,level=2,topic='save') if os.path.abspath(self.link) != os.path.abspath(self.filename): if os.path.exists(self.filename): if filecmp.cmp(self.link,self.filename,shallow=False): self.link=self.filename return True else: os.remove(self.filename) try: if OMFITaux.setdefault('hardLinks',False): os.link(self.link,self.filename) print('Hard link: %s --> %s'%(self.link,self.filename),level=2,topic='save') else: raise(Exception('skip')) except Exception: if os.path.isdir(self.link): shutil.copytree(self.link,self.filename) else: shutil.copy2(self.link,self.filename) self.link=self.filename return True except Exception as _excp: print('Error dynamic save: '+self.filename+'\n'+repr(_excp)) return False return False def __getattr__(self, attr): if attr.startswith('_OMFIT') or attr.startswith('OMFIT'): raise (AttributeError('bad attribute `%s`' % attr)) if self.dynaLoad and attr not in ['__save_kw__', '__tree_repr__', 'modifyOriginal', 'readOnly'] and 'getattr_infiniteloop_block' not in self.__dict__: try: if os.environ['USER'] == 'meneghini': print('%s dynaloading because %s attribute was requested' % (self.__class__.__name__, attr)) print_stack() self.__dict__['getattr_infiniteloop_block'] = True self._dynaLoad() return getattr(self, attr) finally: del self.__dict__['getattr_infiniteloop_block'] raise (AttributeError('bad attribute `%s`' % attr)) def _setLocation(self, key, value): if not hasattr(self, '_OMFITparent') or value is not self._OMFITparent: # check if the parent is the OMFIT tree inOMFITtree = False tmp = self while tmp != None and hasattr(tmp, '_OMFITparent'): if tmp._OMFITparent is None and tmp._OMFITkeyName == 'OMFIT': inOMFITtree = True break tmp = tmp._OMFITparent if not (key in self and id(self[key]) == id(value)) and inOMFITtree: if isinstance_str(value, ['OMFITexpression', 'OMFITiterableExpression']): value = copy.deepcopy(value) try: value._OMFITkeyName = "[" + repr(key) + "]" value._OMFITparent = self value._OMFITcopyOf = None # this copy goes directly into the tree, so we can set it to None except (AttributeError, TypeError): pass return value @_docFromDict @dynaLoad def __len__(self): return super(SortedDict, self).__len__() @_docFromDict @dynaLoad def __hash__(self): return ''.join(list(self.keys())).__hash__() @_docFromDict @dynaLoad def __setitem__(self, key, value): key,value=self._checkSetitem(key,value) if isinstance_str(key, ['OMFITexpression', 'OMFITiterableExpression']): raise(ValueError('OMFITexpressions are not valid keys for '+self.__class__.__name__)) keyL=self._keyCaseInsensitive(key) if keyL!=key: tmp=self.index(keyL) del self[keyL] self.keyOrder.insert(tmp,key) if key not in self.keyOrder: if hasattr(self,'sorted') and self.sorted: _insort(self.keyOrder, key) else: self.keyOrder.append(key) elif self.caseInsensitive: self.keyOrder[self.index(key)]=key super(SortedDict, self).__setitem__(key, self._setLocation(key,value) ) #make whatever SortedDict is under a caseInsensitive SortedDict, caseInsensitive itself if isinstance(self[key],SortedDict) and self.caseInsensitive: self[key].caseInsensitive=self.caseInsensitive #if limit>0 limit the number of entries while self.limit>0 and len(self.keyOrder)>self.limit: delkey=self.keyOrder[0] super(SortedDict, self).__delitem__(delkey) self.keyOrder.remove(delkey) def _checkSetitem(self, key, value): ''' This method is provided so that subclasses can use it to either: 1) change the key/value tuple as passed to the __setitem__ method 2) raise an error because the key-value pair is not acceptable :param key: key as passed by the user to the __setitem__ method :param value: value as passed by the user to the __setitem__ method :return: updated (key, value) tuple ''' return key,value @_docFromDict @dynaLoad def __delitem__(self, key): key=self._keyCaseInsensitive(key) super(SortedDict, self).__delitem__(key) self.keyOrder.remove(key) #does not need @dynaLoadKey, because functions that call _keyCaseInsensitive already do def _keyCaseInsensitive(self,key): if not hasattr(self,'caseInsensitive'): self.caseInsensitive=False found=key if self.caseInsensitive and key not in self.keyOrder and isinstance(key,str): for keyL in self.keyOrder: if isinstance(keyL,str) and keyL.lower()==key.lower(): found=keyL break return found @_docFromDict @dynaLoadKey def __getitem__(self, key): key=self._keyCaseInsensitive(key) try: return super(SortedDict, self).__getitem__(key) except KeyError: #if this instance has a fetch method, then call it and try it __getitem__ again if hasattr(self,'fetch'): self.fetch() return super(SortedDict, self).__getitem__(key) else: raise @_docFromDict @dynaLoadKey def __contains__(self, key): return super(SortedDict, self).__contains__(self._keyCaseInsensitive(key)) def __getstate__(self): tmp=copy.copy(self.__dict__) for k in list(tmp.keys()): if k[:6]=='_OMFIT': del tmp[k] return tmp,list(self.values()) def __setstate__(self,tmp): if isinstance(tmp,dict): #old way of loading sortedDict self.__dict__=tmp for key in list(self.keys()): self[key]=self._setLocation(key, self[key]) else: #new way of loading sortedDict self.limit=False self.__dict__.update(tmp[0]) for key,value in zip(self.keyOrder,tmp[1]): self[key]=self._setLocation(key,value)
[docs] @dynaLoad def index(self, item): """ returns the index of the item """ return self.keyOrder.index(self._keyCaseInsensitive(item))
@_docFromDict @dynaLoad def __iter__(self): return iter(self.keyOrder)
[docs] @_docFromDict @dynaLoad def pop(self, key, *args): key=self._keyCaseInsensitive(key) result = super(SortedDict, self).pop(key, *args) try: self.keyOrder.remove(key) except ValueError: # Key wasn't in the dictionary in the first place. No problem. pass return result
[docs] @_docFromDict @dynaLoad def popitem(self): result = super(SortedDict, self).popitem() self.keyOrder.remove(result[0]) return result
[docs] @_docFromDict @dynaLoad def items(self): return list(zip(self.keyOrder, list(self.values())))
[docs] @_docFromDict @dynaLoad def iteritems(self): for key in self.keyOrder: yield key, self[key]
[docs] @dynaLoad def keys(self,filter=None,matching=False): ''' returns the sorted list of keys in the dictionary :param filter: regular expression for filtering keys :param matching: boolean to indicate whether to return the keys that match (or not) :return: list of keys ''' if filter is None: return self.keyOrder[:] elif not matching: return [kkk for kkk in self.keyOrder[:] if not re.match(filter,str(kkk))] else: return [kkk for kkk in self.keyOrder[:] if re.match(filter,str(kkk))]
[docs] @_docFromDict @dynaLoad def iterkeys(self): return iter(self.keyOrder)
[docs] @_docFromDict @dynaLoad def values(self): return list(map(self.__getitem__, self.keyOrder))
[docs] @_docFromDict @dynaLoad def itervalues(self): for key in self.keyOrder: yield self[key]
[docs] @_docFromDict @dynaLoad def update(self, dict_): for key, value in list(dict_.items()): self[key] = self._setLocation(key,value)
[docs] @dynaLoad def setdefault(self, key, default): """ The method setdefault() is similar to get(), but will set dict[key]=default if key is not already in dict :param key: key to be accessed :param default: default value if key does not exist :return: value """ if key not in self: if hasattr(self,'sorted') and self.sorted: _insort(self.keyOrder, key) else: self.keyOrder.append(key) self[key]=default return self[key]
[docs] @_docFromDict @dynaLoad def get(self, key, default): if key not in self: return default return self[key]
[docs] @dynaLoad def value_for_index(self, index): """Returns the value of the item at the given zero-based index""" return self[self.keyOrder[index]]
[docs] @dynaLoad def insert(self, index, key, value): """Inserts the key, value pair before the item with the given index""" key=self._keyCaseInsensitive(key) if key in self.keyOrder: n = self.keyOrder.index(key) del self.keyOrder[n] if n < index: index -= 1 self.keyOrder.insert(index, key) self[key]=value
[docs] @dynaLoad def copy(self): """Returns a copy of this object""" obj = self.__class__(self) obj.keyOrder = self.keyOrder[:] return obj
@dynaLoad def __repr__(self): """returns the keys in their sorted order""" return '{%s}' % ', '.join(['%r: %r' % (k, v) for k, v in list(self.items())])
[docs] @_docFromDict def clear(self): super(SortedDict, self).clear() self.keyOrder = []
[docs] @dynaLoad def moveUp(self, index): ''' Shift up in key list the item at a given index :param index: index to be shifted :return: None ''' if index<len(self.keyOrder): self.keyOrder.insert(index+1, self.keyOrder.pop(index))
[docs] @dynaLoad def moveDown(self, index): ''' Shift down in key list the item at a given index :param index: index to be shifted :return: None ''' if index>0: self.keyOrder.insert(index-1, self.keyOrder.pop(index))
def __repr__(self): return self.__class__.__name__+'('+str(list(self.items()))+')'
[docs] @dynaLoad def across(self, what='', sort=False, returnKeys=False): ''' Aggregate objects across the tree :param what: string with the regular expression to be cut across :param sort: sorting of results alphabetically :param returnKeys: return keys of elements in addition to objects :return: list of objects or tuple with with objects and keys >> OMFIT['test']=OMFITtree() >> for k in range(5): >> OMFIT['test']['aaa'+str(k)]=OMFITtree() >> OMFIT['test']['aaa'+str(k)]['var']=k >> OMFIT['test']['bbb'+str(k)]=-1 >> print(OMFIT['test'].across("['aaa*']['var']")) ''' location=parseBuildLocation(what) keys=[] for k in list(self.keys()): if isinstance(k,str): if re.match('%r'%location[0],'%r'%k): keys.append(k) else: if re.match('%r'%location[0],'%r'%repr(k)): keys.append(k) if len(location)>1: what=parseBuildLocation(location[1:]) else: what='' if sort: index=numpy.argsort(list(map(float,keys))) else: index=list(range(len(keys))) tmp=[] for k in index: tmp_=self[keys[k]] tmp.append(eval("tmp_"+what)) tmp_ if returnKeys: return tmp,[k for k in numpy.array(keys)[numpy.array(index,int)]] else: return tmp
[docs] @dynaLoad def sort(self, key=None, **kw): ''' :param key: function that returns a string that is used for sorting or dictionary key whose content is used for sordting >> tmp=SortedDict() >> for k in range(5): >> tmp[k]={} >> tmp[k]['a']=4-k >> # by dictionary key >> tmp.sort(key='a') >> # or equivalently >> tmp.sort(key=lambda x:tmp[x]['a']) :param \**kw: additional keywords passed to the underlying list sort command :return: sorted keys ''' if key is None: self.keyOrder.sort(key=sortHuman,**kw) elif not callable(key): self.sort(key=lambda x:self[x][key]) else: self.keyOrder.sort(key=key,**kw) return self.keyOrder
[docs] def sort_class(self, class_order=[dict]): ''' sort items based on their class :param class_order: list containing order of class :return: sorted keys ''' lst={} for k in self.keyOrder: oo=len(class_order) for o,c in list(enumerate(class_order)): if isinstance(self[k],c): oo=o break if hasattr(self[k],'__class__'): for o,c in list(enumerate(class_order)): if self[k].__class__.__name__==c.__name__: oo=o break lst.setdefault(oo,[]).append(k) self.keyOrder=[] for k in range(len(class_order)+1): if k in lst: self.keyOrder+=lst[k] return self.keyOrder
[docs] @dynaLoad def diff(self, other, ignoreComments=False, ignoreContent=False, skipClasses=(), noloadClasses=(), precision=0.0, quiet=True): ''' :param other: other dictionary to compare to :param ignoreComments: ignore keys that start and end with "__" (e.g. "__comment__") :param ignoreContent: ignore content of the objects :param skipClasses: list of class of objects to ignore :param noloadClasses: list of class of objects to not load :param precision: relative precision to which the comparison is carried out :param quiet: verbosity of the comparison :return: comparison dictionary ''' #todo: should allow taking differences among any type of dictionary, not only sorted dict #use difflib.Differ (which operates on strings) to find out differences selfEntries=[repr(key) for key in list(self.keys())] otherEntries=[repr(key) for key in list(other.keys())] tmp=list(difflib.Differ(linejunk=None).compare(selfEntries,otherEntries)) keys=list(map(ast.literal_eval,[_f for _f in [entry[2:] for entry in tmp if entry[0]!='?'] if _f])) #unique keys, keep the ordering, allow caseInsensitive tmp=SortedDict(caseInsensitive=self.caseInsensitive) for key in keys: tmp[key]='' keys=list(tmp.keys()) if ignoreComments: keys=[key for key in keys if not re.match(comment_ptrn,str(key))] ndiffs=0. switch=SortedDict() for key in keys: if not quiet: printi('Compare: '+str(key)) if key not in self: switch[key] = ['added',False] ndiffs+=1. elif key not in other: switch[key] = ['removed',False] ndiffs+=1. else: if isinstance(self[key],skipClasses) or isinstance(other[key],skipClasses): continue if isinstance(self[key],noloadClasses) or isinstance(other[key],noloadClasses): if isinstance(self[key],SortedDict) and isinstance(other[key],SortedDict) and (self[key].dynaLoad or other[key].dynaLoad): switch[key] = ['noLoad',False] continue if isinstance(self[key],SortedDict) and isinstance(other[key],SortedDict): tmp=self[key].diff(other[key], ignoreComments=ignoreComments, ignoreContent=ignoreContent, skipClasses=skipClasses, noloadClasses=noloadClasses, precision=precision, quiet=quiet) if sum([len(tmp[0][k]) for k in tmp[0]]): switch[key] = tmp ndiffs+=1. elif not ignoreContent and different(self[key],other[key],precision=precision): switch[key] = ['changed',False] ndiffs+=1. return [switch,False,keys]
[docs] @dynaLoad def diffKeys(self, other): ''' :param other: other dictionary to compare to :return: floating point to indicate the ratio of keys that are similar ''' #notice that selfKeys and otherKeys are generated with traverse() and do not include comment_ptrn_in_brackets selfKeys=set([key.lower().split('(')[0] for key in traverse(self) if isinstance(key,str) and not re.match(comment_ptrn_in_brackets,str(key))]) otherKeys=set([key.lower().split('(')[0] for key in traverse(other) if isinstance(key,str) and not re.match(comment_ptrn_in_brackets,str(key))]) if len(selfKeys)<len(otherKeys): keys=selfKeys else: keys=otherKeys if len(keys)<2: return 0. ndiffs=0. for key in keys: if len(selfKeys)<len(otherKeys) and key not in otherKeys: ndiffs+=1. elif len(selfKeys)>=len(otherKeys) and key not in selfKeys: ndiffs+=1. lk=len(keys)*1.0 lE=lk-ndiffs if lk>0: score=lE*1.0/lk else: score=1. return score
[docs] @dynaLoad def changeKeysCase(self, case=None, recursive=False): ''' Change all the keys in the dictionary to be upper/lower case :param case: 'upper' or 'lower' :param recursive: apply this recursively :return: None ''' if case is None: return elif case=='upper' or case=='lower': for kid in list(self.keys()): tmp=self.pop(kid) if case=='upper': self[kid.upper()]=tmp elif case=='lower': self[kid.lower()]=tmp for kid in list(self.keys()): if recursive and isinstance(self[kid],SortedDict): self[kid].changeKeysCase(case,recursive=True)
[docs] @dynaLoad def traverse(self, string='', level=100, onlyDict=False, onlyLeaf=False, skipDynaLoad=False): ''' Equivalent to the `tree` command in UNIX :param string: prepend this string :param level: depth :param onlyDict: only subtrees and not the leafs :return: list of strings containing the dictionary path to each object ''' return traverse(self,string,level, split=True, onlyDict=onlyDict, onlyLeaf=onlyLeaf, skipDynaLoad=skipDynaLoad)
[docs] @dynaLoad def walk(self, function, **kw): """ Walk every member and call a function on the keyword and value :param function: `function(self,kid,**kw)` :param \**kw: kw passed to the function :return: self """ for kid in list(self.keys()): if hasattr(self[kid],'walk'): self[kid].walk(function,**kw) else: self[kid]=function(self,kid,**kw) return self
[docs] @dynaLoad def safe_del(self, key): ''' Delete key entry only if it is present :param key: key to be deleted ''' if key in self: del self[key]
[docs] @dynaLoad def flatten(self): ''' The hierarchical structure of the dictionaries is flattened :return: SortedDict populated with the flattened content of the dictionary ''' tmp=SortedDict(caseInsensitive=self.caseInsensitive) for item in list(self.keys()): if isinstance(self[item],SortedDict): tmp.update(self[item].flatten()) else: tmp[item]=self[item] return tmp
[docs] @dynaLoad def setFlat(self, key, value): ''' recursively searches dictionary for key in order to set a value raises KeyError if key could not be found, so this method cannot be used to set new entries in the dictionary :param key: key to be set :param value: value to set ''' if key in self: self[key]=value return else: for item in list(self.keys()): if isinstance(self[item],SortedDict): try: self[item].setFlat(key,value) return except KeyError: pass raise KeyError('`%s` could not be found throughout the dictionary'%key)
[docs] @dynaLoad def check_location(self, location, value=_special1): ''' check if location exist and equals value (if value is specified) :param location: location as string :param value: value for which to return equal :return: True/False >> root['SETTINGS'].check_location("['EXPERIMENT']['shot']", 133221) ''' try: eval('self'+location) except KeyError: return False if value is not _special1 and evalExpr(eval('self'+location))!=evalExpr(value): return False return True
def __popup_menu__(self): ''' Dummy method to avoid dynamic loading for classes that do not override it :return: empty list ''' return []
#automatic handle removing of _OMFITxxx attributes when pickling _dumps=pickle.dumps if __name__ == "__main__": aa = SortedDict({'foo': 1, 'bar': 2, 'yo': 4}) a = SortedDict({'foo': 1, 'bar': 2, 'yo': 4, 'dd': aa}) bb = SortedDict({'foo': 0, 'foobar': 3, 'yo': 4}) b = SortedDict({'foo': 0, 'foobar': 3, 'yo': 4, 'dd': bb}) diff, switch, keys = a.diff(b) print(diff)