Source code for p4.func

"""Various functions."""
import os
import sys
import re
import string
import math
import io
import random
import glob
import time
import pickle
import random
import inspect
import datetime
import subprocess

from p4.var import var
# from p4.sequencelist import Sequence, SequenceList
import p4.sequencelist
from p4.alignment import Alignment
from p4.nexus import Nexus
from p4.tree import Tree
from p4.node import Node
from p4.p4exceptions import P4Error
from p4.constraints import Constraints
import p4.pf as pf
import numpy
from p4.pnumbers import Numbers
import p4.version
from p4.nexustoken import nextTok



[docs]def nexusCheckName(theName): """Check to see if theName conforms to Nexus standards See page 597 of Maddison, Swofford, and Maddison 1997, where they say "Names are single NEXUS words; they cannot consist entirely of digits (e.g., a taxon called 123 is illegal). The all-digit name restriction can be relaxed in p4 by setting var.nexus_allowAllDigitNames. Single digit names are prohibited, regardless. """ if not isinstance(theName, str): print("func.nexusCheckName() %s is not str, is %s" % (theName, type(theName))) return 0 if len(theName) == 1 and theName[0] not in string.ascii_letters: return 0 if not var.nexus_allowAllDigitNames: try: int(theName) return 0 except ValueError: return 1 else: return 1
[docs]def nexusUnquoteAndDeUnderscoreName(theName): """Deal with underscores and quotes. Returns theName If theName is not quoted, convert any underscores to spaces. If theName is quoted, remove the outside quotes, and convert any cases of 2 single quotes in a row to 1 single quote. This does not appear to be used in the rest of p4. """ if theName[0] == "'": assert theName[-1] == "'", \ "func.nexusUnquoteAndDeUnderscoreName(). First char is a single quote, but last char is not." theName = theName[1:-1] return theName.replace("''", "'") if '_' in theName: return theName.replace('_', ' ') else: return theName
[docs]def nexusUnquoteName(theName): """Deal with quotes. Returns theName If theName is not quoted, just return it. If theName is quoted, remove the outside quotes, and convert any cases of 2 single quotes in a row to 1 single quote. """ if theName[0] == "'": if theName[-1] != "'": gm = ['func.nexusUnquoteName()'] gm.append('the name is %s' % theName) gm.append("First char is a single quote, but last char is not.") raise P4Error(gm) theName = theName[1:-1] if theName.count("''"): return theName.replace("''", "'") else: return theName else: return theName
[docs]def nexusFixNameIfQuotesAreNeeded(theName, verbose=0): """Add quotes if needed, for writing to a Nexus file. Returns a possibly modified theName. Usually it will not need quoting, so it just returns theName unchanged. If theName is None, or if it starts with a single quote, just return it. If it has (nexus-defined) punctuation, or spaces, then put quotes around it before returning it. If there are internal single quotes, then double them, nexus-style. Except if there are any already doubled single quotes, don't double them. """ if theName == None: return theName if theName.startswith("'"): return theName quotesAreNeeded = 0 # for c in var.nexus_punctuation: for c in var.punctuation: if c in theName: quotesAreNeeded = 1 break if not quotesAreNeeded: for c in string.whitespace: if c in theName: quotesAreNeeded = 1 break if quotesAreNeeded: if verbose: oldName = theName if theName.count("'"): # If we have doubled quotes, don't re-double them if theName.count("''"): pass else: theName = theName.replace("'", "''") newName = "'" + theName + "'" if verbose: print("Warning. Nexus quoting |%s| to |%s|" % (oldName, newName)) return newName else: return theName
[docs]def isDnaRnaOrProtein(aString): """Attempts to determinine the data type by the composition. Returns 1 for DNA, 2 for RNA, and 0 for protein. Or so it thinks. It only works for lowercase symbol letters. """ nGaps = aString.count('-') nQs = aString.count('?') strLenNoGaps = len(aString) - (nGaps + nQs) acgn = (aString.count('a') + aString.count('c') + aString.count('g') + aString.count('n')) t = aString.count('t') u = aString.count('u') if 0: print("stringLength(no gaps) = %s" % strLenNoGaps) print("acgn = %f" % acgn) print("t = %f" % t) print("u = %f" % u) # If it is 90% or better acgn +t or +u, then assume it is dna or rna threshold = 0.9 * strLenNoGaps if acgn + t >= threshold: return 1 elif acgn + u >= threshold: return 2 # At this point we still don't know. It might be dna or rna with # lots of ambigs. But we will only allow that if the content of # acgn +t or +u is more than threshold1. That should # prevent eg 'rymkswhvd' from being assigned as dna. threshold1 = 0.75 * strLenNoGaps tally = (aString.count('r') + aString.count('y') + aString.count('m') + aString.count('k') + aString.count('s') + aString.count('w') + aString.count('h') + aString.count('b') + aString.count('v') + aString.count('d')) threshold2 = 0.99 * strLenNoGaps # print " string length is %i, strLenNoGaps is %i" % (len(aString), strLenNoGaps) # print " threshold2 is %3.1f, acgn + t + ambigs = %i" % (threshold, (acgn + t + tally)) # print " threshold1 is %3.1f, acgn + t = %i" % (threshold, (acgn + t)) if (acgn + t + tally >= threshold2) and (acgn + t >= threshold1): return 1 elif (acgn + u + tally >= threshold2) and (acgn + u >= threshold1): return 2 else: return 0
[docs]def stringZapWhitespaceAndDigits(inString): out = list(inString) theRange = range(len(out)) for i in reversed(theRange): if out[i] in string.whitespace or out[i] in string.digits: del out[i] return ''.join(out)
[docs]def dump(): """A top-level dump of p4 trees, files, sequenceLists, and alignments.""" print("\np4 dump") if len(var.fileNames) == 0: print(" Haven't read any files") elif len(var.fileNames) == 1: print(" Read one file: %s" % var.fileNames[0]) else: print(" Read files:") for i in var.fileNames: print(" %s" % i) if len(var.alignments) == 0: # print " There are no alignments." pass elif len(var.alignments) == 1: print(" There is 1 alignment.") # if recursive: # var.alignments[0].dump() else: print(" There are %i alignments." % len(var.alignments)) # if recursive: # for i in var.alignments: # i.dump() if len(var.sequenceLists) == 0: # print " There are no sequenceLists." pass elif len(var.sequenceLists) == 1: print(" There is 1 sequenceList.") # if recursive: # var.sequenceLists[0].dump() else: print(" There are %i sequenceLists." % len(var.sequenceLists)) # if recursive: # for i in var.sequenceLists: # i.dump() if len(var.trees) == 0: # print " There are no trees." pass elif len(var.trees) == 1: print(" There is 1 tree.") # if recursive: # var.trees[0].dump() else: print(" There are %i trees." % len(var.trees))
# if recursive: # for i in var.trees: # i.dump()
[docs]def read(stuff): """Read in data, trees, or python code from a file or from a string. For example:: read('myTreeFile.nex') or :: read('*.nex') or :: read('((A, (B, C)), D, (E, F));') This is meant to be the main way to get phylogenetic stuff from files into p4. The argument *stuff* can be a file name, or filenames described with wildcards (a 'glob', eg ``*.nex``), or it can be a string. If you are specifying a literal file name or a glob, you will of course want to put it in quotes. If you want it to read a file, and you mis-specify the file name, it will try to read the bad file name as a string (and generally fail, of course). This method will recognize these data files -- - nexus - fasta - gde (not gde flat files) - clustalw (``*.aln``) - phylip (sequential or interleaved) and these tree files -- - nexus - phylip/newick If there is a suffix, one of nex, nexus, aln, phy, phylip, gde, fasta, fas, fsa, or py, it will use that to make a choice of what kind of file it is. (So don't give a fasta file the suffix 'phy' or it will fail for sure.) Fortunately the various kinds of files (including Python) are almost mutually exclusive. A fasta file must start with either a '>' or a ';' as the first character. A gde file has '{' as the first character. A phylip data file must have the two integers as the first non-whitespace things on the first line. In a nexus file, the first non-whitespace character must be either a '#' or a '['. (Python files might start with a '#' also.) It is easy to fool. For example, start any old nonsense file with two integers, and this function will think that it is a phylip data file! So have a little bit of care what you feed this function. If you have trouble and want to know what it is thinking as it attempts to read your file, set var.verboseRead=True. Does anybody use GDE anymore? This only reads parts of a GDE formatted alignment. It reads the name, type, and sequence. It does not read the Sequence-ID, creation date, direction, strandedness, nor the comments. It correctly handles offset. It fills out the ends of the sequences to make an alignment. In gde files, MASK sequences are changed into Nexus CharSet instances. The charsets made from the masks are of course printed out if you print out the alignment as a Nexus file. """ if var.verboseRead: print("var.verboseRead is turned on") # Is this still a Bug?: single node phylip or raw newick trees must be specified as # ()A; they cannot be specified simply as A. gm = ['p4.read()'] if isinstance(stuff, str): pass else: gm.append("I was expecting a string argument.") raise P4Error(gm) #nAlignments = len(var.alignments) if os.path.exists(stuff): readFile(stuff) else: # Is it a glob? myFlist = glob.glob(stuff) # print "read(). stuff=%s, glob result: %s" % (stuff, myFlist) if myFlist: # It appears to be a glob for fName in myFlist: readFile(fName) else: # Nope, not a glob. Is it a string, not a file name? if var.warnReadNoFile: print("\nread()") print(" A file by the name specified by the argument cannot be found.") print(" So I am assuming that it is to be taken as a string.") print(" Maybe it was a mis-specified file name?") print(" (You can turn off this warning by turning var.warnReadNoFile off.)\n") if 0: if sys.version_info < (3,): stuff = unicode(stuff) flob = io.StringIO(stuff) if 1: if sys.version_info < (3,): flob = io.BytesIO(stuff) else: flob = io.StringIO(stuff) _decideFromContent('<input string>', flob)
[docs]def readFile(fName): """If its a data or tree file, read it. If its python code, exec it.""" gm = ['func.readFile(%s)' % fName] # print(gm) # I should check if the file is a text file, an executable, or whatever. try: flob = open(fName) except IOError: gm.append("Can't open %s. Are you sure you have the right name?" % fName) raise P4Error(gm) # print(flob, type(flob), flob.name) # print(dir(flob)) # See if there is an informative suffix on the file name # If there is a suffix, but the file cannot be read, # it is a serious error, and death follows. result = re.search('(.+)\.(.+)', fName) if result: #baseName = result.group(1) # print("got result.group(2) = %s" % result.group(2)) suffix = result.group(2).lower() #print("readFile: got suffix '%s'" % suffix) if suffix == 'py': flob.close() import __main__ # print("__main__.__dict__ is %s" % __main__.__dict__) #execfile(fName, __main__.__dict__, __main__.__dict__) #exec(open(fName).read(), __main__.__dict__, __main__.__dict__) #exec(flob.read(), __main__.__dict__, __main__.__dict__) # The following is better than simple exec(open(fName).read()) # because in the event of a traceback the former (below) knows the # file name, but the simple version does not. As explained on # stackoverflow, "(The compile call isn't strictly needed, but it # associates the filename with the code object making debugging a # little easier.)" with open(fName) as f: myCode = compile(f.read(), fName, 'exec') exec(myCode, __main__.__dict__, __main__.__dict__) if hasattr(__main__, 'pyFileCount'): __main__.pyFileCount += 1 return elif suffix == 'nex' or suffix == 'nexus': _tryToReadNexusFile(fName, flob) return elif suffix in ['fasta', 'fas', 'fsa']: ret = _tryToReadFastaFile(fName, flob) if not ret: gm.append("Failed to read supposed fasta file '%s'" % fName) raise P4Error(gm) return elif suffix == 'gde': ret = _tryToReadGdeFile(fName, flob) if not ret: gm.append("Failed to read supposed gde file '%s'" % fName) raise P4Error(gm) return elif suffix in ['pir', 'nbrf']: ret = _tryToReadPirFile(fName, flob) if not ret: gm.append("Failed to read supposed pir file '%s'" % fName) raise P4Error(gm) return elif suffix == 'phy' or suffix == 'phylip': ret = _tryToReadPhylipFile(fName, flob, None) if not ret: gm.append("Failed to read supposed phylip file '%s'" % fName) raise P4Error(gm) return elif suffix == 'aln': ret = _tryToReadClustalwFile(fName, flob) if not ret: gm.append("Failed to read supposed clustalw file '%s'" % fName) raise P4Error(gm) return elif result.group(2) in ['p4_tPickle']: # preserve uppercase if var.verboseRead: print("Trying to read '%s' as a pickled Tree file..." % fName) # It should be a binary open flob.close() flob = open(fName, "rb") ret = pickle.load(flob) if not ret: gm.append("Failed to read supposed p4_tPickle file '%s'." % fName) raise P4Error(gm) else: if isinstance(ret, Tree): ret.fName = fName var.trees.append(ret) var.fileNames.append(fName) if var.verboseRead: print("Got a tree from file '%s'." % fName) else: gm.append("Failed to get a Tree from '%s'" % fName) raise P4Error(gm) return else: _decideFromContent(fName, flob) else: _decideFromContent(fName, flob)
# if var.verboseRead: # print "(You can turn off these messages by turning var.verboseRead # off.)\n" def _decideFromContent(fName, flob): gm = ["func._decideFromContent()"] firstLine = False while not firstLine: firstLine = flob.readline() if not firstLine: # end of the file break firstLine = firstLine.strip() if firstLine: # got some content break else: # print "blank line" pass # if firstLine: # print "got firstLine = %s" % firstLine # else: # print "Got a blank file." if not firstLine: gm.append("Input '%s' is empty." % fName) raise P4Error(gm) else: # Fasta, phylip, and clustalw files have clues on the first line # If these files fail to be what they are first supposed to be, # then the program does not die, it just complains. # News, May 2001. I will allow blank lines at the beginning. if firstLine[0] == '>' or firstLine[0] == ';': if var.verboseRead: print("Guessing that '%s' is a fasta file..." % fName) ret = _tryToReadFastaFile(fName, flob, firstLine) if not ret: if var.verboseRead: print("Failed to read '%s' as a fasta file." % fName) if ret: return elif firstLine[0] == 'C': if var.verboseRead: print("First letter is 'C'-- guessing that '%s' is a clustalw file..." % fName) ret = _tryToReadClustalwFile(fName, flob, firstLine) if not ret: if var.verboseRead: print("Failed to read '%s' as a clustalw file." % fName) if ret: return else: # Maybe it is a phylip file? if var.verboseRead: print("Guessing that '%s' is a phylip file..." % fName) # either data or trees # Deal with what punctuation is considered to be, for the tokenizer # This week, the default is that punctuation is nexus_punctuation # but it could be set by the user punctuationWasNexusPunctuation = False if var.punctuation == var.nexus_punctuation: punctuationWasNexusPunctuation = True var.punctuation = var.phylip_punctuation ret = _tryToReadPhylipFile(fName, flob, firstLine) # Reset the punctuation if it was default, not if it was set by the user if punctuationWasNexusPunctuation: var.punctuation = var.nexus_punctuation if not ret: if var.verboseRead: print("Failed to read '%s' as a phylip file." % fName) if ret: return # If we are here, then its not a fasta or phylip file # The first non-white char of a Nexus file must be a '[' or a '#' # So first, check for that. # Then confirm that it is a Nexus file, by trying to intantiate a NexusFile object # We have to do this because the possible beginnings of Nexus files are # too varied, and so we need the nextTok() function to sort it all out. # For example, it is perfectly valid to start a Nexus file with # [a comment #nexus] #Ne[this is a comment]Xus flob.seek(0) firstChar = ' ' while firstChar in string.whitespace: firstChar = flob.read(1) # print "Got firstChar candidate '%s'" % firstChar # Redundant: this problem seems to be caught above, in # _tryToReadPhylipFile() if firstChar == '': gm.append("This file seems to be composed only of whitespace.") raise P4Error(gm) # print "got firstChar = %s" % firstChar flob.seek(0) if var.verboseRead: print("Guessing that '%s' is a nexus or gde file..." % fName) if firstChar in ['[', '#']: # it might be a nexus file if var.verboseRead: print("Guessing that '%s' is a nexus file..." % fName) ret = _tryToReadNexusFile(fName, flob) if ret: return elif firstChar == '{': if var.verboseRead: print("Guessing that '%s' is a gde file..." % fName) ret = _tryToReadGdeFile(fName, flob) if ret: return # If we are here then it isn't a nexus or gde file. Previously I then # tried to read it as a python script, even though it does not end in # '.py'. Probably a bad idea. So just give up. if var.verboseRead: print("Giving up on trying to read '%s'." % fName) if fName == '<input string>': flob.seek(0) first100 = flob.read(100) if first100: gm = ["Couldn't make sense out of the input '%s'" % first100] else: gm = ["Couldn't make sense out of the input '%s'" % fName] gm.append("It is not a file name, and I could not make sense out of it otherwise.") else: gm = ["Couldn't make sense of the input '%s'." % fName] flob.close() raise P4Error(gm) def _tryToReadNexusFile(fName, flob): if var.verboseRead: print("Trying to read '%s' as a nexus file..." % fName) nf = Nexus() # nf.readNexusFile() # returns -1 if it does not start with a #nexus token # returns 1 otherwise ret = nf.readNexusFile(flob) # print "Nexus.readNexusFile() returned a %s" % ret if ret == -1: if var.verboseRead: print("Failed to get '%s' as a nexus file." % fName) else: if 0: print("\n******************************************\n") nf.dump() print("\n******************************************\n") if len(nf.alignments): for a in nf.alignments: a.checkLengthsAndTypes() if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) nf.alignments = [] if len(nf.trees): for t in nf.trees: t.checkDupedTaxonNames() if t.taxNames: t.checkTaxNames() var.trees += nf.trees nf.trees = [] # check for duplicate tree names (turned off) if 0: if len(var.trees): tnList = [] for t in var.trees: tnList.append(t.name.lower()) for t in var.trees: if tnList.count(t.name.lower()) > 1: print("P4: Warning: got duplicated tree name '%s' (names are compared in lowercase)" \ % t.name) # print "Lowercased tree names: %s" % tnList # sys.exit() if hasattr(flob, 'name'): var.fileNames.append(flob.name) if var.verboseRead: print("Got nexus file '%s'" % fName) return 1 def _tryToReadFastaFile(fName, flob, firstLine=None): if not firstLine: firstLine = False while not firstLine: firstLine = flob.readline() if not firstLine: # end of the file break firstLine = firstLine.strip() if firstLine: # got some content break else: # print "blank line" pass if 0: if var.verboseRead: print("got firstLine = %s" % firstLine) else: print("Got a blank file.") if not firstLine: gm = ["_tryToReadFastaFile: the file '%s' is empty!" % fName] raise P4Error(gm) else: if var.verboseRead: print("Trying to read '%s' as a fasta file..." % fName) if len(firstLine) <= 1: if var.verboseRead: print("First line is blank--- not fasta") return if firstLine[0] not in ">;": if var.verboseRead: print("First char is neither '>' nor ';' ---not fasta") return flob.seek(0) sl = p4.sequencelist.SequenceList(flob) # this parses the file contents if hasattr(flob, 'name'): sl.fName = flob.name var.fileNames.append(flob.name) sl.checkNamesForDupes() # If we have equal sequence lengths, then it might be an # alignment hasEqualSequenceLens = True if len(sl.sequences) <= 1: hasEqualSequenceLens = None # ie not applicable else: len0 = len(sl.sequences[0].sequence) for s in sl.sequences[1:]: if len(s.sequence) != len0: hasEqualSequenceLens = False if not hasEqualSequenceLens: if var.verboseRead: print("The sequences appear to be different lengths") var.sequenceLists.append(sl) else: if var.verboseRead: print("The sequences appear to be all the same length") try: # includes a call to checkLengthsAndTypes() a = sl.alignment() # a.checkLengthsAndTypes() except: if var.verboseRead: print("Its not an alignment, even tho the sequences are all the same length.") print(" Maybe p4 (erroneously?) thinks that the sequences are different dataTypes.") var.sequenceLists.append(sl) if var.verboseRead: print("Got fasta file '%s'." % fName) return 1 if var.verboseRead: print("The fasta file appears to be an alignment.") if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) if var.verboseRead: print("Got fasta file '%s'." % fName) return 1 def _tryToReadPhylipFile(fName, flob, firstLine): # print "tryToReadPhylipFile here" # print "firstLine is '%s'" % firstLine gm = ["func._tryToReadPhylipFile()"] if not firstLine: firstLine = flob.readline() # print "B firstLine is '%s'" % firstLine if not firstLine: gm.append("The file %s is empty." % fName) raise P4Error(gm) splitLine = firstLine.split() # If theres 2 numbers on the first line, it may be a phylip data file if len(splitLine) >= 2: try: firstNum = int(splitLine[0]) secondNum = int(splitLine[1]) if var.verboseRead: print("Trying to read '%s' as a phylip data file..." % fName) a = Alignment() if hasattr(flob, 'name'): a.fName = flob.name a.readOpenPhylipFile(flob, firstNum, secondNum) a.checkNamesForDupes() a.checkLengthsAndTypes() if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) var.fileNames.append(fName) if var.verboseRead: print("Got '%s' as a phylip-like file." % fName) return 1 except ValueError: if var.verboseRead: print("Does not seem to be a phylip or phylip-like data file.") # Ok, so the file did not start with 2 integers. It might still # be a Phylip tree file. flob.seek(0, 0) # Go to the beginning of the flob firstChar = flob.read(1) while firstChar and firstChar in string.whitespace: firstChar = flob.read(1) if not firstChar: gm.append("No non-whitespace chars found.") raise P4Error(gm) # print "got firstChar '%s'" % firstChar if firstChar not in ['(', '[']: # The '[' for puzzle output. if var.verboseRead: print("First char is not '(' or '['. This does not seem to be a phylip or puzzle tree file.") return if var.verboseRead: print("Trying to read '%s' as a phylip tree file..." % fName) flob.seek(0, 0) theseTrees = [] while 1: savedPosition = flob.tell() # Just to check whether there is a token that can be read ... tok = nextTok(flob) if not tok: break if tok != '(': if var.verboseRead: print("First char was '%s'," % firstChar, end=' ') print("so I thought it was a phylip or puzzle tree file.") print("However, after having read in %i trees," % len(theseTrees)) print(" it confused me by starting a supposed new tree with a '%s'" % tok) return flob.seek(savedPosition, 0) # Throw the token away. t = Tree() t.name = 't%i' % len(theseTrees) t.parseNewick(flob, None) # None is the translationHash t._initFinish() theseTrees.append(t) if len(theseTrees) == 0: return else: for t in theseTrees: t.checkDupedTaxonNames() # if t.taxNames: # Why would it? # t.checkTaxNames() var.trees += theseTrees if hasattr(flob, 'name'): var.fileNames.append(flob.name) if var.verboseRead: print("Got %i trees from phylip tree file '%s'" % (len(theseTrees), fName)) return 1 def _tryToReadClustalwFile(fName, flob, firstLine=None): if not firstLine: firstLine = flob.readline() if not firstLine: gm.append( "func. _tryToReadClustalwFile() The file %s is empty." % fName) raise P4Error(gm) expectedFirstLine = 'CLUSTAL' if firstLine.startswith(expectedFirstLine): if var.verboseRead: print("Trying to read '%s' as a clustalw file..." % fName) a = Alignment() if hasattr(flob, 'name'): a.fName = flob.name var.fileNames.append(flob.name) a._readOpenClustalwFile(flob) a.checkNamesForDupes() a.checkLengthsAndTypes() if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) if var.verboseRead: print("Got '%s' as a clustalw file." % fName) return 1 def _tryToReadGdeFile(fName, flob): if var.verboseRead: print("Trying to read '%s' as a gde file..." % fName) a = Alignment() if hasattr(flob, 'name'): a.fName = flob.name var.fileNames.append(flob.name) a._readOpenGdeFile(flob) # a.writePhylip() if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) if var.verboseRead: print("Got '%s' as a gde file." % fName) return 1 def _tryToReadPirFile(fName, flob): if var.verboseRead: print("Trying to read '%s' as a pir file..." % fName) flob.seek(0) sl = p4.sequencelist.SequenceList() ret = sl._readOpenPirFile(flob) if not ret: if var.verboseRead: print("Reading it as a pir file didn't work.") return None else: # print "Got sl" if hasattr(flob, 'name'): sl.fName = flob.name var.fileNames.append(flob.name) sl.checkNamesForDupes() # If we have equal sequence lengths, then it might be an alignment hasEqualSequenceLens = True if len(sl.sequences) <= 1: hasEqualSequenceLens = None # ie not applicable else: len0 = len(sl.sequences[0].sequence) for s in sl.sequences[1:]: if len(s.sequence) != len0: hasEqualSequenceLens = False if not hasEqualSequenceLens: if var.verboseRead: print("The sequences appear to be different lengths") var.sequenceLists.append(sl) else: if var.verboseRead: print("The sequences appear to be all the same length") try: # includes a call to checkLengthsAndTypes() a = sl.alignment() except: if var.verboseRead: print("Its not an alignment, even tho the sequences are all the same length.") print(" Maybe p4 (erroneously?) thinks that the sequences are different dataTypes.") var.sequenceLists.append(sl) if var.verboseRead: print("Got pir file '%s'." % fName) return 1 if var.verboseRead: print("The pir file appears to be an alignment.") if var.doCheckForAllGapColumns: a.checkForAllGapColumns() if var.doCheckForBlankSequences: a.checkForBlankSequences() if var.doCheckForDuplicateSequences: a.checkForDuplicateSequences() var.alignments.append(a) if var.verboseRead: print("Got pir file '%s'." % fName) return 1
[docs]def splash(): """Print a splash screen for p4.""" print('') from p4.version import versionString, dateString print("p4 v %s, %s" % (versionString, dateString)) print(""" usage: p4 or p4 [-i] [-x] [-d] [yourScriptOrDataFile] [anotherScriptOrDataFile ...] or p4 --help p4 is a Python package for phylogenetics. p4 is also the name of a Python script that loads the p4 package.""") print(""" There is documentation at http://p4.nhm.ac.uk """) print(""" Using the p4 script, after reading in the (optional) files on the command line, p4 goes interactive unless one of the files on the command line is a Python script. Use the -i option if you want to go interactive even if you are running a script. Use the -x option to force exit, even if there was no Python script read. If you use the -d option, then p4 draws any trees that are read in on the command line, and then exits. Peter Foster The Natural History Museum, London p.foster@nhm.ac.uk""") if var.examplesDir: print("\nSee the examples in %s" % var.examplesDir) print('') print("(Control-d to quit.)\n")
[docs]def splash2(outFile=None, verbose=True): """Another splash, showing things like version, git hash, and date If verbose is set, it gets printed to sys.stdout. If you set an outFile, it will also be appended to that file. It also returns the info as a list of strings. """ # Collect all the info in a list of strings stuff = [] # Stolen from Cymon. Thanks! #stuff.append("\nSummary from func.splash2()") stuff.append("%16s: %s" % ("P4 version", p4.version.versionString)) lp = os.path.dirname(inspect.getfile(p4)) stuff.append("%16s: %s" % ("Library path", lp)) # Get git version. if os.path.isdir(os.path.join(os.path.dirname(lp), '.git')): try: # I got these from https://stackoverflow.com/questions/14989858/get-the-current-git-hash-in-a-python-script # subprocess.check_output(['git', 'rev-parse', 'HEAD']) # ret = subprocess.check_output(['git', '-C', '%s' % lp, 'rev-parse', '--short', 'HEAD']) ret = subprocess.check_output(['git', '-C', '%s' % lp, 'log', '-1', '--date=short', '--pretty=format:"%h -- %cd -- %cr"']) #ret = ret.strip() # get rid of newline, needed for rev-parse ret = ret[1:-1] # get rid of quotes, needed for log stuff.append("%16s: %s" % ("git hash", ret)) except subprocess.CalledProcessError: #print("%16s: %s" % ("git hash", "Not a git repo?")) pass else: stuff.append("%16s: %s" % ("git hash", "Not a git repo")) stuff.append("%16s: %s" % ("Python version", ".".join([str(i) for i in sys.version_info[:-2]]))) #print("%16s: %s" % ("Date" , datetime.datetime.now().strftime("%d/%m/%Y"))) stuff.append("%16s: %s" % ("Today's date" , datetime.datetime.now().strftime("%Y-%m-%d"))) # iso 8601 see https://xkcd.com/1179/ host = os.uname()[1].split('.')[0] stuff.append("%16s: %s" % ("Host", host)) #stuff.append("\n") if outFile: print("Appending splash2 info to file %s" % outFile) fh = open(outFile, "a") for aLine in stuff: print(aLine, file=fh) fh.close() if verbose: for aLine in stuff: print(aLine) return stuff
[docs]def randomTree(taxNames=None, nTax=None, name='random', seed=None, biRoot=0, randomBrLens=1, constraints=None, randomlyReRoot=True): """Make a simple random Tree. You can supply a list of taxNames, or simply specify nTax. In the latter case the specified number of (boringly-named) leaves will be made. The default is to have 'randomBrLens', where internal nodes get brLens of 0.02 - 0.05, and terminal nodes get brLens of 0.02 - 0.5. Branch lengths are all 0.1 if randomBrLens is turned off. This method starts with a star tree and keeps adding nodes until it is fully resolved. If 'biRoot' is set, it adds one more node, and roots on that node, to make a bifurcating root. Repeated calls will give different random trees, without having to do any seed setting. If for some reason you want to make identical random trees, set the seed to some positive integer, or zero. If you want the tree to have some topological constraints, say so with a Constraints object. Returns a tree.""" complaintHead = '\nrandomTree()' gm = [complaintHead] # we need either taxNames or nTax if not taxNames and not nTax: gm.append("You need to supply either taxNames or nTax.") raise P4Error(gm) if taxNames and nTax: if len(taxNames) != nTax: gm.append("You need not supply both taxNames and nTax,") gm.append("but if you do, at least they should match, ok?") raise P4Error(gm) if taxNames: # implies not [] nTax = len(taxNames) elif nTax: taxNames = [] for i in range(nTax): taxNames.append('t%i' % i) if constraints: assert isinstance(constraints, Constraints) # if we are asking for a biRoot tree, that does not have a random root, then we need a bi-rooted constraint if biRoot and not randomlyReRoot: if not constraints or constraints.tree.root.getNChildren() != 2: gm.append("For a biRoot'ed tree, that has a specified root (ie randomlyReRoot=False),") gm.append("a constraints object is needed, specified with a constraint tree as") gm.append("((all, the, taxon), (names, in, two, forks));") gm.append("although you can have additional internal constraints.") raise P4Error(gm) # Make a random list of indices for the taxNames if seed != None: # it might be 0 random.seed(seed) indcs = list(range(nTax)) random.shuffle(indcs) # Instantiate the tree, and add a root node t = Tree() t._taxNames = taxNames n = Node() n.br = None t.name = name n.nodeNum = 0 t.nodes.append(n) t.root = n # Add the left child of the root n = Node() n.isLeaf = 1 t.root.leftChild = n t.nodes.append(n) n.nodeNum = 1 n.name = taxNames[indcs[0]] n.parent = t.root previousNode = n # Add the rest of the terminal nodes nodeNum = 2 for i in range(nTax)[1:]: n = Node() n.isLeaf = 1 t.nodes.append(n) n.name = taxNames[indcs[nodeNum - 1]] previousNode.sibling = n previousNode = n n.parent = t.root n.nodeNum = nodeNum nodeNum += 1 # t.dump(node=1) # t.draw() # constraints.dump() nNodesAddedForConstraints = 0 if constraints: for aConstraint in constraints.constraints: # print "doing aConstraint %s %i" % (getSplitStringFromKey(aConstraint, nTax), aConstraint) #t.dump(tree=0, node=1) t.setPreAndPostOrder() eTaxNames = [] for i in range(nTax): tester = 1 << i # Does aConstraint contain the tester bit? if tester & aConstraint: eTaxNames.append(taxNames[i]) # print "aConstraint %s" % eTaxNames # check that they all share the same parent firstParent = t.node(eTaxNames[0]).parent for tN in eTaxNames[1:]: if t.node(tN).parent != firstParent: gm.append("constraint %s" % getSplitStringFromKey( aConstraint, constraints.tree.nTax)) gm.append("'%s' parent is not node %i" % (tN, firstParent.nodeNum)) gm.append( 'It appears that there are incompatible constraints.') raise P4Error(gm) n = Node() n.nodeNum = nodeNum nodeNum += 1 chosenName = random.choice(eTaxNames) eTaxNames.remove(chosenName) # print 'adding a new parent for %s' % chosenName chosenNode = t.node(chosenName) chosenNodeOldSib = chosenNode.sibling chosenNodeOldLeftSib = chosenNode.leftSibling() # could be None n.parent = firstParent n.leftChild = chosenNode n.sibling = chosenNodeOldSib chosenNode.parent = n if chosenNodeOldLeftSib: chosenNodeOldLeftSib.sibling = n else: firstParent.leftChild = n chosenNode.sibling = None t.nodes.append(n) nNodesAddedForConstraints += 1 oldChosenNode = chosenNode if 0: t.preOrder = None t.postOrder = None t.preAndPostOrderAreValid = False t.draw() while eTaxNames: chosenName = random.choice(eTaxNames) # print "adding '%s'" % chosenName eTaxNames.remove(chosenName) chosenNode = t.node(chosenName) chosenNodeOldSib = chosenNode.sibling chosenNodeOldLeftSib = chosenNode.leftSibling() if 0: if chosenNodeOldLeftSib: print('chosenNodeOldLeftSib = %s' % chosenNodeOldLeftSib.nodeNum) else: print('chosenNodeOldLeftSib = None') if chosenNodeOldSib: print('chosenNodeOldSib = %s' % chosenNodeOldSib.nodeNum) else: print('chosenNodeOldSib = None') chosenNode.parent = n oldChosenNode.sibling = chosenNode if chosenNodeOldLeftSib: chosenNodeOldLeftSib.sibling = chosenNodeOldSib else: firstParent.leftChild = chosenNodeOldSib chosenNode.sibling = None oldChosenNode = chosenNode if 0: t.preOrder = None t.postOrder = None t.preAndPostOrderAreValid = False t.draw() if 0: t.preOrder = None t.postOrder = None t.preAndPostOrderAreValid = False t.draw() # sys.exit() # Now we have a star tree. Now add internal nodes until it is all # resolved, which needs nTax - 3 nodes # print 'nNodesAddedForConstraints is %i' % nNodesAddedForConstraints for i in range(nTax - 3 - nNodesAddedForConstraints): # Pick a random node, that has a sibling, and a # sibling.sibling. This week I am first making a list of # suitables and then choosing a random one, rather than first # choosing a random node and then asking whether it is # suitable. It could be made more efficient by not re-making # the ssNodes list every time, but rather just keeping it up # to date. But that is not so simple, so re-make the list # each time. ssNodes = [] for n in t.nodes: if n.sibling and n.sibling.sibling: # This next thing can happen if there are constraints. # But it turns out that getNChildren() is very, very slow! To be avoided! # if n.parent == t.root and t.root.getNChildren() == 3: # pass if n.parent == t.root and t.root.leftChild.sibling and \ t.root.leftChild.sibling.sibling and not \ t.root.leftChild.sibling.sibling.sibling: pass else: ssNodes.append(n) lChild = random.choice(ssNodes) # print "lChild = node %i" % lChild.nodeNum # +----------1:oldLeftSib # | # +----------2:lChild # 0 # +----------3:lChildSib # | # +----------4:oldLChildSibSib # +----------1:oldLeftSib # | # | +----------3:lChild # 0----------2(n) # | +----------4:lChildSib # | # +----------5:oldLChildSibSib n = Node() n.nodeNum = nodeNum nodeNum = nodeNum + 1 lChildSib = lChild.sibling # guarranteed to have one oldLChildSibSib = lChildSib.sibling # ditto # oldLeftSib = lChild.parent.leftChild # first guess ... # if oldLeftSib != lChild: # while oldLeftSib.sibling != lChild: # oldLeftSib = oldLeftSib.sibling # else: # oldLeftSib = None oldLeftSib = lChild.leftSibling() # could be none if 0: if oldLeftSib: print("oldLeftSib = %i" % oldLeftSib.nodeNum) else: print("oldLeftSib = None") print("lChildSib = %i" % lChildSib.nodeNum) if oldLChildSibSib: print("oldLChildSibSib = %i" % oldLChildSibSib.nodeNum) else: print("oldLChildSibSib = None") if oldLeftSib: oldLeftSib.sibling = n else: lChild.parent.leftChild = n n.parent = lChild.parent lChild.parent = n n.leftChild = lChild lChildSib.parent = n n.sibling = oldLChildSibSib lChildSib.sibling = None t.nodes.append(n) if 0: t.preOrder = None t.postOrder = None t.preAndPostOrderAreValid = False t.draw() if biRoot: # addNodeBetweenNodes() requires t.preOrder. t.preOrder = numpy.array([var.NO_ORDER] * len(t.nodes), numpy.int32) t.postOrder = numpy.array([var.NO_ORDER] * len(t.nodes), numpy.int32) if len(t.nodes) > 1: t.setPreAndPostOrder() if randomlyReRoot: # pick a random node, with a parent n = t.nodes[random.randrange(1, len(t.nodes))] newNode = t.addNodeBetweenNodes(n, n.parent) t.reRoot(newNode, moveInternalName=False) else: # We have checked that we have a valid constraints and constraints tree, above theKey = constraints.tree.root.leftChild.br.splitKey t.makeSplitKeys() for kNode in t.nodes: if kNode.br and kNode.br.splitKey == theKey: break assert kNode.br.splitKey == theKey newNode = t.addNodeBetweenNodes(kNode, kNode.parent) t.reRoot(newNode, moveInternalName=False) else: if randomlyReRoot: # The way it is now, the root rightmost child is always a # leaf. Not really random, then, right? So choose a random # internal node, and re-root it there. # print "nTax=%i, len(t.nodes)=%i" % (nTax, len(t.nodes)) if nTax > 3: n = t.nodes[random.randrange(nTax + 1, len(t.nodes))] t.reRoot(n, moveInternalName=False) # The default is to have randomBrLens, where internal nodes get # brLens of 0.02 - 0.05, and terminal nodes get brLens of 0.2 - # 0.5. Branch lengths are all 0.1 if randomBrLens is turned # off. if randomBrLens: for n in t.nodes: if n != t.root: if n.isLeaf: n.br.len = 0.05 + (random.random() * 0.45) else: n.br.len = 0.02 + (random.random() * 0.03) # _initFinish() checks that leaf nodes have names, and sets preAndPostOrder t._initFinish() # Added Sept 2020, because NDCH2 wants nodeNums in order. newNodes = [] for i,n in enumerate(t.iterPreOrder()): n.nodeNum = i newNodes.append(n) t.nodes = newNodes t.setPreAndPostOrder() return t
[docs]def newEmptyAlignment(dataType=None, symbols=None, taxNames=None, length=None): """Make de novo and return an Alignment object, made of gaps. It is not placed in var.alignments. """ complaintHead = ['\nnewEmptyAlignment()'] gm = complaintHead # check for silliness if not dataType: gm.append( "No dataType. You need to specify at least the dataType, taxNames, and sequenceLength.") raise P4Error(gm) if not taxNames: gm.append( "No taxNames. You need to specify at least the dataType, taxNames, and sequenceLength.") raise P4Error(gm) if not length: gm.append( "No length. You need to specify at least the dataType, taxNames, and sequenceLength.") raise P4Error(gm) goodDataTypes = ['dna', 'protein', 'standard'] if dataType not in goodDataTypes: gm.append("dataType '%s' is not recognized.") gm.append("I only know about %s" % goodDataTypes) raise P4Error(gm) if dataType == 'standard': if not symbols: gm.append("For standard dataType you need to specify symbols.") raise P4Error(gm) else: if symbols: gm.append( "You should not specify symbols for %s dataType." % dataType) raise P4Error(gm) a = Alignment() a.length = length a.dataType = dataType # dataTypes, symbols, dim if a.dataType == 'dna': a.symbols = 'acgt' a.dim = 4 a.equates = {'n': 'acgt', 'm': 'ac', 'k': 'gt', # 'x': 'acgt', 'h': 'act', 'y': 'ct', 'v': 'acg', 'w': 'at', 'd': 'agt', 'b': 'cgt', 'r': 'ag', 's': 'cg'} elif a.dataType == 'protein': a.symbols = 'arndcqeghilkmfpstwyv' a.dim = 20 a.equates = {'b': 'dn', 'x': 'arndcqeghilkmfpstwyv', 'z': 'eq'} elif a.dataType == 'standard': a.symbols = symbols a.dim = len(symbols) # Make sequences, composed of gaps. for n in taxNames: s = p4.sequencelist.Sequence() s.name = n s.dataType = a.dataType s.sequence = '-' * a.length a.sequences.append(s) return a
[docs]def getSplitStringFromKey(theKey, nTax, escaped=False): """Convert a long int binary split key to dot-star notation.""" ss = ['.'] * nTax #ss = ['0'] * nTax for i in range(nTax): tester = 2 ** i if tester & theKey: ss[i] = '*' #ss[i] = '1' if escaped: return '\\' + '\\'.join(ss) else: return ''.join(ss)
[docs]def getSplitKeyFromTaxNames(allTaxNames, someTaxNames): """Make a long int binary split key from a list of taxNames. allTaxNames -> an ordered list, nTax long someTaxNames -> list of taxNames on one side of the split. The split key that is returned will always be even. For example, assuming :: allTaxNames ['A', 'B', 'C', 'D'] someTaxNames ['B', 'D'] The bits for all the taxa are:: A B C D 1 2 4 8 So the split key for ``['B', 'D']`` will be 2 + 8 = 10 Another :: func.getSplitKeyFromTaxNames(['A', 'B', 'C', 'D'], ['B', 'D']) # returns 10 However, if the splitKey is odd, it is bit-flipped. So if ``someTaxNames = ['A', 'D']``, the raw split key for ``['A','D']`` will be 1 + 8 = 9, binary '1001', which is then xor'd with 1111, giving 6. Another :: getSplitKeyFromTaxNames(['A', 'B', 'C', 'D'], ['A', 'D']) returns 6 """ gm = ['func.getSplitKeyFromTaxNames()'] if not len(allTaxNames) or not len(someTaxNames): gm.append("Got an empty arg?!?") raise P4Error(gm) theIndices = [] for tn in someTaxNames: try: theIndex = allTaxNames.index(tn) except ValueError: gm.append("The taxName '%s' is not in allTaxNames." % tn) raise P4Error(gm) if theIndex not in theIndices: # Duped indices would be a Bad Thing theIndices.append(theIndex) # print "theIndices = %s" % theIndices theRawSplitKey = 0 for i in theIndices: theRawSplitKey += 1 << i # "<<" is left-shift if 1 & theRawSplitKey: # Is it odd? or Does it contain a 1? allOnes = 2 ** (len(allTaxNames)) - 1 theSplitKey = allOnes ^ theRawSplitKey # "^" is xor, a bit-flipper. return theSplitKey else: return theRawSplitKey
def _sumOfRows(aList): """ Adds up the rows of a 2d matrix, returning the vector. Eg _sumOfRows([[2,3], [6,13]]) returns [5, 19] """ if not isinstance(aList[0], list): print("_sumOfRows: not a 2D array. Assume its a row vector and return sum") return sum(aList) outList = [] for i in aList: outList.append(sum(i)) return outList def _sumOfColumns(aList): """ Adds up the rows of a 2d matrix, returning the vector. Eg _sumOfColumns([[2,3], [6,13]]) returns [8, 16] """ if not isinstance(aList[0], list): print("_sumOfColumns: not a 2D array. Assume its a column vector and return sum") return sum(aList) theLen = len(aList[0]) for i in aList: if theLen != len(i): print("_sumOfColumns: unequal rows") return None outList = [0] * theLen for i in aList: for j in range(theLen): outList[j] = outList[j] + i[j] return outList def _expected(sor, soc): # sumOfRows, sumOfCols nRows = len(sor) nCols = len(soc) grand = float(sum(sor)) expectedOut = [] for i in range(nRows): outRow = [] for j in range(nCols): outRow.append((sor[i] / grand) * soc[j]) expectedOut.append(outRow) return expectedOut
[docs]def xSquared(observed): """Calculate the X^2 statistic from an R x C table. Arg observed is a 2D R x C table. This stat is sometimes called Chi-squared. """ gm = ["func.xSquared()"] nRows = len(observed) nCols = len(observed[0]) theSumOfRows = _sumOfRows(observed) theSumOfCols = _sumOfColumns(observed) theExpected = _expected(theSumOfRows, theSumOfCols) # print theExpected for i in theSumOfRows: if i == 0.0: gm.append( "Sum of rows includes a zero. Can't calculate xSquared.") raise P4Error(gm) for i in theSumOfCols: if i == 0.0: gm.append( "Sum of cols includes a zero. Can't calculate xSquared.") raise P4Error(gm) xSq = 0.0 for i in range(nRows): for j in range(nCols): xSq = xSq + ((observed[i][j] - theExpected[i][j]) * (observed[i][j] - theExpected[i][j]) / theExpected[i][j]) return xSq
[docs]def variance(seq): """This would not be good for a lot of data. n - 1 weighted.""" sumSeq = float(sum(seq)) return (_sumOfSquares(seq) - ((sumSeq * sumSeq) / len(seq))) / (len(seq) - 1)
# return (_sumOfSquares(seq) - ((sumSeq * sumSeq) / len(seq))) / len(seq) def _stdErrorOfTheDifferenceBetweenTwoMeans(seq1, seq2): """This could use some re-coding to handle short (<30) n""" if len(seq1) == len(seq2) and len(seq1) > 30: return math.sqrt((variance(seq1) + variance(seq2)) / len(seq1)) else: gm = ["_stdErrorOfTheDifferenceBetweenTwoMeans()"] gm.append( "I can only deal with sequences of equal length, each more than 30 long.") raise P4Error(gm)
[docs]def mean(seq): """Simple, pure-python mean. For big lists, use something better.""" return float(sum(seq)) / len(seq)
[docs]def studentsTStat(seq1, seq2): """Returns Student's t statistic for 2 lists or tuples. Mean of seq1 - mean of seq2, divided by the _stdErrorOfTheDifferenceBetweenTwoMeans(seq1, seq2) """ return (mean(seq1) - mean(seq2)) / _stdErrorOfTheDifferenceBetweenTwoMeans(seq1, seq2)
[docs]def tailAreaProbability(theStat, theDistribution, verbose=1): """Calculate the tail area probability of theStat. That is the number of items in theDistribution that are greater than or equal to theStat. theDistribution need not be sorted.""" gm = ["tailAreaProbability()"] theLen = len(theDistribution) for i in theDistribution: try: float(i) except TypeError: gm.append( "Item '%s' from theDistribution does not seem to be a float." % i) raise P4Error(gm) try: float(theStat) except TypeError: gm.append("theStat '%s' does not seem to be a float." % theStat) raise P4Error(gm) hits = 0 theMax = theDistribution[0] theMin = theDistribution[0] for x in theDistribution: if x < theMin: theMin = x elif x > theMax: theMax = x if x >= theStat: hits = hits + 1 tap = float(hits) / float(theLen) if verbose: print("# The stat is %s" % theStat) print("# The distribution has %i items" % len(theDistribution)) print("# The distribution goes from %s to %s" % (theMin, theMax)) print("# Items in the distribution were >= theStat %i times." % hits) print("# The tail-area probability is %f" % tap) return [theStat, (theMin, theMax), tap]
[docs]def ls(): """Like the shell ls """ fList = os.listdir('.') fList.sort() for f in fList: print(f)
[docs]def which(what, verbose=0): """Asks if an auxiliary program is available. This uses the shell command 'which' to find whether a program (as given by the argument 'what') is in the path. It returns 0 or 1. If verbose is turned on, it speaks the path, if it exists.""" if not isinstance(what, str): raise P4Error("function which(). I was expecting a string argument.") f = os.popen('which %s 2> /dev/null' % what, 'r') aLine = f.readline() f.close() if aLine: aLine = aLine[:-1] # tcsh does this, but I have not tested this part. if aLine.endswith('Command not found.'): aLine = None if aLine: if verbose: print(aLine) return 1 else: return 0
[docs]def which2(program): """Find an executable http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python """ def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def _writeRMatrixTupleToOpenFile(theTuple, dim, flob, offset=23): gm = ["func._writeRMatrixTupleToOpenFile()"] if dim < 2: gm.append("dim must be two or more for this to work.") raise P4Error(gm) isShort = False # For backward compatibility if var.rMatrixNormalizeTo1: isShort = False if len(theTuple) != (((dim * dim) - dim) / 2): if len(theTuple) == ((((dim * dim) - dim) / 2) - 1): isShort = True else: gm.append("var.rMatrixNormalizeTo1 is %i" % var.rMatrixNormalizeTo1) gm.append("The length of the tuple (%i) is " % len(theTuple)) gm.append("incommensurate with the dim (%i)" % dim) gm.append("(should be %i)" % (((dim * dim) - dim) / 2)) raise P4Error(gm) else: isShort = True if len(theTuple) != (((dim * dim) - dim) / 2) - 1: gm.append("var.rMatrixNormalizeTo1 is %i" % var.rMatrixNormalizeTo1) gm.append("The length of the tuple (%i) is " % len(theTuple)) gm.append("incommensurate with the dim (%i)" % dim) gm.append("(should be %i)" % ((((dim * dim) - dim) / 2) - 1)) raise P4Error(gm) if dim == 3: flob.write('%s\n' % repr(theTuple)) else: # if var.rMatrixNormalizeTo1: if not isShort: # print "theTuple=%s" % theTuple tuplePos = 0 row = 0 flob.write('(') for i in range((dim - row) - 1): flob.write('%10.6f, ' % theTuple[tuplePos]) tuplePos += 1 flob.write('\n') row += 1 formatString = '%' + '%is' % offset while row < dim - 2: flob.write(formatString % ' ') flob.write(' ') for i in range(row): flob.write(' ') for i in range((dim - row) - 1): flob.write('%10.6f, ' % theTuple[tuplePos]) tuplePos = tuplePos + 1 flob.write('\n') row += 1 flob.write(formatString % ' ') flob.write(' ') for i in range(row): flob.write(' ') # print "dim=%i, row=%i, range((dim-row) - 1) = %s, tuplePos=%i" % # (dim, row, range((dim-row)-1), tuplePos) flob.write('%10.6f )' % theTuple[tuplePos]) flob.write('\n') else: tuplePos = 0 row = 0 flob.write('(') for i in range((dim - row) - 1): flob.write('%10.6f, ' % theTuple[tuplePos]) tuplePos += 1 flob.write('\n') row += 1 formatString = '%' + '%is' % offset while row < dim - 3: flob.write(formatString % ' ') flob.write(' ') for i in range(row): flob.write(' ') for i in range((dim - row) - 1): flob.write('%10.6f, ' % theTuple[tuplePos]) tuplePos = tuplePos + 1 flob.write('\n') row += 1 flob.write(formatString % ' ') flob.write(' ') for i in range(row): flob.write(' ') for i in range((dim - row) - 2): flob.write('%10.6f, ' % theTuple[tuplePos]) tuplePos += 1 flob.write('%10.6f )' % theTuple[tuplePos]) flob.write('\n') def _writeCharFreqToOpenFile(theCharFreq, dim, symbols, flob, offset=23): formatString = '%' + '%is' % offset s = 0.0 if symbols: if dim == 2: flob.write('(') flob.write(' [%c] %f,)\n' % (symbols[0], theCharFreq[0])) flob.write(formatString % ' ') flob.write('[and [%c] %f, ' % (symbols[1], 1.0 - theCharFreq[0])) else: flob.write('(') flob.write(' [%c] %f,\n' % (symbols[0], theCharFreq[0])) s = theCharFreq[0] for i in range(dim - 2)[1:]: flob.write(formatString % ' ') flob.write(' [%c] %f,\n' % (symbols[i], theCharFreq[i])) s = s + theCharFreq[i] flob.write(formatString % ' ') flob.write(' [%c] %f)\n' % (symbols[dim - 2], theCharFreq[dim - 2])) s = s + theCharFreq[dim - 2] flob.write(formatString % ' ') flob.write('[and [%c] %f, ' % (symbols[dim - 1], 1.0 - s)) else: if dim == 2: flob.write('(') flob.write('%f,)\n' % theCharFreq[0]) flob.write(formatString % ' ') flob.write('[and %f, ' % (1.0 - theCharFreq[0])) else: flob.write('(') flob.write('%f,\n' % theCharFreq[0]) s = theCharFreq[0] for i in range(dim - 2)[1:]: flob.write(formatString % ' ') flob.write('%f,\n' % theCharFreq[i]) s = s + theCharFreq[i] flob.write(formatString % ' ') flob.write('%f)\n' % theCharFreq[dim - 2]) s = s + theCharFreq[dim - 2] flob.write(formatString % ' ') flob.write('[and %f, ' % (1.0 - s))
[docs]def fixCharsForLatex(theString): if not theString: return theString l = list(theString) # print l inMath = False for i in range(len(l)): if l[i] in string.ascii_letters or l[i] in string.digits: pass elif l[i] == '$': if inMath: inMath = False else: inMath = True elif l[i] in ['%', '#']: l[i] = '\\' + l[i] elif l[i] in ['_']: if inMath: pass else: l[i] = '\\' + l[i] elif l[i] in ['|']: if inMath: pass else: l[i] = '$' + l[i] + '$' else: pass if l[0] == "'" and l[-1] == "'": del(l[-1]) del(l[0]) theRange = range(len(l)) for i in reversed(theRange[:-1]): if l[i] == "'" and l[i - 1] == "'": del(l[i]) return ''.join(l)
[docs]def maskFromNexusCharacterList(nexusCharListString, maskLength, invert=0): """Returns a mask string, converted from a Nexus char list. Convert a Nexus characters list to a mask string composed of zeros and 1's Eg char list ``r'1 2-4 6-10'`` (don't forget the 'r' if you have any backslash characters) becomes (for a maskLength of 16) ``1111011111000000`` And ``r'10-.'`` results in ``0000000001111111`` (for maskLength 16). Note that Nexus char lists are 1-based. Invert inverts the zeros and 1's. """ gm = ["maskFromNexusCharacterList()"] from p4.alignment import cListPat, cList2Pat, cListAllPat #cListPat = re.compile('(\d+)-?(.+)?') #cList2Pat = re.compile('(.+)\\\\(\d+)') #cListAllPat = re.compile('all\\\\?(\d+)?') # print("char list is: %s" % nexusCharListString) cList = nexusCharListString.split() # print("cList is %s" % cList) mask = ['0'] * maskLength for c in cList: # eg 6-10\2 first = None # the first item eg 6 second = None # the second item eg 10 third = None # the third item eg 2 result = cListPat.match(c) if result: # print("%s\t%s" % (result.group(1), result.group(2))) first = result.group(1) if result.group(2): r2 = cList2Pat.match(result.group(2)) if r2: # print("%s\t%s" % (r2.group(1), r2.group(2))) second = r2.group(1) third = r2.group(2) else: second = result.group(2) else: result = cListAllPat.match(c) if result: first = '1' second = '.' third = result.group(1) else: gm.append( "Can't parse maskFromNexusCharacterList '%s'" % nexusCharListString) raise P4Error(gm) # print("first = %s, second = %s, third = %s" % (first, second, third)) if not first: gm.append("Can't parse maskFromNexusCharacterList '%s'" % nexusCharListString) raise P4Error(gm) elif first and not second: # its a single if first.lower() == 'all': for i in range(len(mask)): mask[i] = '1' elif first == '.': mask[-1] = '1' else: try: it = int(first) mask[it - 1] = '1' except ValueError: gm.append("Can't parse '%s' in maskFromNexusCharacterList '%s'" % (first, nexusCharListString)) raise P4Error(gm) elif first and second: # its a range try: start = int(first) except ValueError: gm.append("Can't parse '%s' in maskFromNexusCharacterList '%s'" % (first, nexusCharListString)) raise P4Error(gm) if second == '.': fin = len(mask) else: try: fin = int(second) except ValueError: gm.append("Can't parse '%s' in maskFromNexusCharacterList '%s'" % (second, nexusCharListString)) raise P4Error(gm) if third: try: bystep = int(third) except ValueError: gm.append("Can't parse '%s' in maskFromNexusCharacterList '%s'" % (third, nexusCharListString)) raise P4Error(gm) for spot in range(start - 1, fin, bystep): mask[spot] = '1' else: for spot in range(start - 1, fin): mask[spot] = '1' if invert: for i in range(len(mask)): if mask[i] == '0': mask[i] = '1' elif mask[i] == '1': mask[i] = '0' return ''.join(mask)
[docs]def polar2square(angleLenList): """Convert a coord in polar coords to usual (Cartesian? square?) coords. Input is a list composed of the angle in radians and the length (ie from the origin). A list of [x,y] is returned. """ if angleLenList[1] == 0.0: return [0, 0] elif angleLenList[1] < 0.0: raise P4Error("func.polar2square error: len is less than zero") if angleLenList[0] == (math.pi / 2.0) or angleLenList[0] == (math.pi / -2.0): adj = 0.0 else: adj = math.cos(angleLenList[0]) * angleLenList[1] if angleLenList[0] == math.pi or angleLenList[0] == -math.pi: opp = 0.0 else: opp = math.sin(angleLenList[0]) * angleLenList[1] return [adj, opp]
[docs]def square2polar(xyList): """Convert usual square coords to polar. Arg is [x,y], and [angle, length] is returned. """ angle = math.atan2(xyList[1], xyList[0]) len = math.hypot(xyList[0], xyList[1]) return [angle, len]
[docs]def factorial(n): """Return n! Its fast for n up to 30, cuz it uses a dictionary, rather than doing the computations. Not needed for newer Pythons -- its in the math module.""" try: n = int(n) except: raise P4Error("n should be (at least convertible to) an int.") assert n >= 0, "n should be zero or positive." fact = {0: 1, 1: 1, 2: 2, 3: 6, 4: 24, 5: 120, 6: 720, 7: 5040, 8: 40320, 9: 362880, 10: 3628800, 11: 39916800, 12: 479001600, 13: 6227020800, 14: 87178291200, 15: 1307674368000, 16: 20922789888000, 17: 355687428096000, 18: 6402373705728000, 19: 121645100408832000, 20: 2432902008176640000, 21: 51090942171709440000, 22: 1124000727777607680000, 23: 25852016738884976640000, 24: 620448401733239439360000, 25: 15511210043330985984000000, 26: 403291461126605635584000000, 27: 10888869450418352160768000000, 28: 304888344611713860501504000000, 29: 8841761993739701954543616000000, 30: 265252859812191058636308480000000} if n <= 30: return fact[n] else: total = 1 while n > 1: total *= n n -= 1 return total
[docs]def nChooseK(n, k): """Get the number of all possible len k subsets from range(n).""" assert isinstance(n, int) assert isinstance(k, int) assert n >= 0, "n should be zero or more." assert k <= n, "k should be less than or equal to n" assert k >= 0, "k should be zero or more." nFact = factorial(n) kFact = factorial(k) nMinusKFact = factorial(n - k) return nFact / (kFact * nMinusKFact)
[docs]def nUnrootedTrees(nTaxa): upper = (nTaxa * 2) - 5 nTrees = 1 i = 3 while i <= upper: nTrees *= i i += 2 return nTrees
[docs]def nRootedTrees(nTaxa): upper = (nTaxa * 2) - 3 nTrees = 1 i = 3 while i <= upper: nTrees *= i i += 2 return nTrees
[docs]def nRootedTreesWithMultifurcations(nTaxa): """Returns a list T(n,m) with m from zero to n-1 n is the number of leaves m is the number of internal nodes The first number in the returned list will always be zero, and the second number will always be 1. See Felsenstein, page 27. So for example, for nTaxa = 8 (as in the example), this function returns [0, 1, 246, 6825, 56980, 190575, 270270, 135135]. """ # Make a table. table = [] for nInts in range(nTaxa): table.append([0] * (nTaxa + 1)) for nT in range(2, nTaxa + 1): table[1][nT] = 1 for nInt in range(2, nTaxa): for nTx in range(nInt + 1, nTaxa + 1): table[nInt][nTx] = ( (nTx + nInt - 2) * table[nInt - 1][nTx - 1]) + (nInt * table[nInt][nTx - 1]) if 0: # Print out the table. print("%-9s|" % "nTx ->", end=' ') for nTx in range(1, nTaxa + 1): print("%10i" % nTx, end=' ') print() for nTx in range(nTaxa + 1): print("%10s" % " ---------", end=' ') print() print("%8s |" % "nInt") for nInt in range(1, nTaxa): print("%8i |" % nInt, end=' ') for nTx in range(nInt): print("%10s" % "", end=' ') for nTx in range(nInt + 1, nTaxa + 1): print("%10i" % table[nInt][nTx], end=' ') print() results = [] for nInt in range(nTaxa): results.append(table[nInt][nTaxa]) return results
[docs]def nUnrootedTreesWithMultifurcations(nTaxa): """Returns a list T(n,m) with m from zero to n-2 n is the number of leaves m is the number of internal nodes The first number in the returned list will always be zero, and the second number will always be 1. See Felsenstein, page 27. So for example, for nTaxa = 9 (as in the example), this function returns [0, 1, 246, 6825, 56980, 190575, 270270, 135135]. """ # From Felsenstein, page 27, 28. return nRootedTreesWithMultifurcations(nTaxa - 1)
[docs]def dirichlet1(inSeq, alpha, theMin, theMax=None, u=None): """Modify inSeq with a draw from a Dirichlet distribution with a single alpha value. *inSeq* is a list, and a copy is made, modified, normalized so that it sums to 1.0, and returned. This function uses the function random.gammavariate(x, 1.0), which takes draws from a gamma distribution (not gamma function). Now random.gammavariate(x, 1.0) can return zero for x less than about 0.001. Eg here are results for 1000 draws from different x values x=1.0000000000 min= 0.000319714 mean= 0.96978 x=0.1000000000 min= 1.65643e-38 mean= 0.10074 x=0.0100000000 min=4.03328e-309 mean= 0.013386 x=0.0010000000 min= 0 mean= 0.0026256 x=0.0001000000 min= 0 mean= 1.625e-09 x=0.0000100000 min= 0 mean=8.8655e-15 x=0.0000010000 min= 0 mean=1.0435e-268 x=0.0000001000 min= 0 mean= 0 Assuming we do not want zeros, a big alpha would help, but only by however big alpha is, so for example if alpha is 100 then we can still usually get a non-zero from inSeq vals of 0.0001. One hack that is adopted here is to possibly add a constant u that might be 0.1 or more to x (depending on alpha), so that the arg x is always big enough to return a non-zero. Stick that in the arg u, which is by default None """ gm = ['func.dirichlet1()'] kk = len(inSeq) if theMax == None: theMax = 1.0 - ((kk - 1) * theMin) safety = 0 safetyLimit = 300 while 1: #theSeq = inSeq[:] if u: theSeq = [ random.gammavariate((inSeq[i] * alpha) + u, 1.0) for i in range(kk)] else: theSeq = [ random.gammavariate(inSeq[i] * alpha, 1.0) for i in range(kk)] # print safety, theSeq theSum = sum(theSeq) theSeq = [v / theSum for v in theSeq] thisMin = min(theSeq) thisMax = max(theSeq) isOk = True if (thisMin < theMin) or (thisMax > theMax): isOk = False if isOk: return theSeq safety += 1 if safety > safetyLimit: gm.append( "Tried more than %i times to get good dirichlet values, and failed. Giving up." % safetyLimit) gm.append("inSeq: %s" % inSeq) gm.append("theMin: %s, theMax: %s, u=%s" % (theMin, theMax, u)) raise P4Error(gm)
[docs]def unPickleMcmc(runNum, theData, verbose=True): """Unpickle a checkpoint, return an Mcmc ready to go.""" gm = ["func.unPickleMcmc()"] try: runNum = int(runNum) except (ValueError, TypeError): gm.append("runNum should be an int, 0 or more. Got %s" % runNum) raise P4Error(gm) if runNum < 0: gm.append("runNum should be an int, 0 or more. Got %s" % runNum) raise P4Error(gm) baseName = "mcmc_checkPoint_%i." % runNum ff = glob.glob("%s*" % baseName) pickleNums = [] for f in ff: genNum = int(f.split('.')[1]) pickleNums.append(genNum) if not pickleNums: # an empty sequence gm.append("Can't find any checkpoints for runNum %i." % runNum) gm.append("Got the right runNum?") raise P4Error(gm) theIndx = pickleNums.index(max(pickleNums)) fName = ff[theIndx] if verbose: print("...unpickling Mcmc in %s" % fName) f = open(fName, 'rb') m = pickle.load(f) f.close() # Restore gsl_rng state if not var.gsl_rng: var.gsl_rng = pf.gsl_rng_get() # accommodate old checkpoints that do not have this stuff if hasattr(m, "gsl_rng_state_ndarray"): the_gsl_rng_size = pf.gsl_rng_size(var.gsl_rng) # size of the state assert m.gsl_rng_state_ndarray.shape[0] == the_gsl_rng_size pf.gsl_rng_setstate(var.gsl_rng, m.gsl_rng_state_ndarray) else: # an old checkpoint with no random state; these steps are also done in Mcmc.__init__() pf.gsl_rng_set(var.gsl_rng, int(time.time())) the_gsl_rng_size = pf.gsl_rng_size(var.gsl_rng) # size of the state # A place to store the state. Empty to start. It is stored during a checkpoint. m.gsl_rng_state_ndarray = numpy.array(['0'] * the_gsl_rng_size, numpy.dtype('B')) # B is unsigned byte if hasattr(m, 'randomState'): # restore random (python module) state assert m.randomState random.setstate(m.randomState) else: # an old checkpoint, that does not have this yet. m.randomState = None m.tree.data = theData m.tree.calcLogLike(verbose=False, resetEmpiricalComps=False) if m.simulate: m.simTree.data = theData.dupe() m.simTree.calcLogLike(verbose=False, resetEmpiricalComps=False) for chNum in range(m.nChains): ch = m.chains[chNum] ch.curTree.data = theData ch.curTree.calcLogLike(verbose=False, resetEmpiricalComps=False) ch.propTree.data = theData ch.propTree.calcLogLike(verbose=False, resetEmpiricalComps=False) m._setLogger() return m
[docs]def unPickleSTMcmc(runNum, verbose=True): """Unpickle a STMcmc checkpoint, return an STMcmc ready to go.""" gm = ["func.unPickleSTMcmc()"] try: runNum = int(runNum) except (ValueError, TypeError): gm.append("runNum should be an int, 0 or more. Got %s" % runNum) raise P4Error(gm) if runNum < 0: gm.append("runNum should be an int, 0 or more. Got %s" % runNum) raise P4Error(gm) baseName = "mcmc_checkPoint_%i." % runNum ff = glob.glob("%s*" % baseName) pickleNums = [] for f in ff: genNum = int(f.split('.')[1]) pickleNums.append(genNum) if not pickleNums: # an empty sequence gm.append("Can't find any checkpoints for runNum %i." % runNum) gm.append("Got the right runNum?") raise P4Error(gm) theIndx = pickleNums.index(max(pickleNums)) fName = ff[theIndx] if verbose: print("...unpickling Mcmc in %s" % fName) f = open(fName, 'rb') m = pickle.load(f) f.close() if m.stRFCalc == 'fastReducedRF': import pyublas # needed for chNum in range(m.nChains): ch = m.chains[chNum] ch.startFrrf() if m.modelName == 'SPA' and var.stmcmc_useFastSpa: import p4.fastspa as fastspa m.fspa = fastspa.FastSpa(m.useSplitSupport) for tNum, t in enumerate(m.trees): m.fspa.setInTr(tNum, t.nTax, m.nTax, t.baTaxBits.to01(), t.firstTax) for n in t.internals: if n.br and hasattr(n.br, "support"): support = n.br.support else: support = -1.0 m.fspa.setInTrNo(tNum, n.stSplitKey.to01(), support) #m.fspa.summarizeInTrs() for chNum in range(m.nChains): ch = m.chains[chNum] ch.setupBitarrayCalcs() ch.getTreeLogLike_spa_bitarray() if var.stmcmc_useFastSpa: #print("Here E. bitarray propTree.logLike is %f" % self.propTree.logLike) fspaLike = ch.stMcmc.fspa.calcLogLike(ch.chNum) diff = math.fabs(ch.propTree.logLike - fspaLike) #print("Got fspaLike %f, diff %g" % (fspaLike, diff)) if diff > 1e-13: gm.append("bad fastspa likelihood calc, %f vs %f, diff %f" % (ch.propTree.logLike, fspaLike, diff)) raise P4Error(gm) ch.curTree.logLike = ch.propTree.logLike m._setLogger() return m
#################################################
[docs]def recipes(writeToFile=var.recipesWriteToFile): """Reminders, suggestions, multi-step methods... These should be in the Sphinx docs also. """ gm = ["func.recipes()"] if not var.examplesDir: gm.append("Can't find the Examples directory.") raise P4Error(gm) recipesDir = os.path.join(var.examplesDir, 'W_recipes') # # One line blurb, and default file name. # recipesList = [ # ["Calculate likelihood.", 'sLike.py'], # ["Calculate likelihood with more than one data partition.", 'sLikeMultiPart.py'], # ["Simulate data.", 'sSim.py'], # ["Simulate data with more than one data partition.", 'sSimMultiPart.py'], # ["Do an MCMC.", 'sMcmc.py'], # ["Do an MCMC with more than one data partition.", 'sMcmcMultiPart.py'], # ["Read checkPoints from an MCMC.", 'sReadMcmcCheckPoints.py'], # ["Restart an MCMC.", 'sRestartMcmc.py'], # ["Make a consensus tree.", 'sCon.py'], # ] fList = glob.glob("%s/*.py" % recipesDir) # print fList bNames = [os.path.basename(nm) for nm in fList] # print bNames firstLines = [] for fN in fList: f = open(fN) firstLine = f.readline() f.close() if firstLine.startswith('#'): firstLine = firstLine[1:] firstLine = firstLine.strip() else: firstLine = "Fix me!" firstLines.append(firstLine) recipesList = [] for fNum in range(len(fList)): recipesList.append([firstLines[fNum], bNames[fNum]]) # print recipesList for recNum in range(len(recipesList)): rec = recipesList[recNum] print("%s. %s" % (string.ascii_uppercase[recNum], rec[0])) ret = input('Tell me a letter: ') # print "Got %s" % ret if ret == '': return elif ret[0] in string.ascii_uppercase: recNum = string.ascii_uppercase.index(ret[0]) elif ret[0] in string.ascii_lowercase: recNum = string.ascii_lowercase.index(ret[0]) else: return # print "Got recNum %i" % recNum if writeToFile: if sys.version_info < (3,): ret = raw_input('Write it to file name [default %s] :' % recipesList[recNum][1]) else: ret = input('Write it to file name [default %s] :' % recipesList[recNum][1]) if ret == '': theFName = recipesList[recNum][1] else: theFName = ret.strip() if os.path.exists(theFName): "The file %s already exists. I'm refusing to over-write it, so I'm doing nothing." % theFName return print("Writing to file '%s' ..." % theFName) os.system("cp %s %s" % (os.path.join(recipesDir, recipesList[recNum][1]), theFName)) else: print("\n") os.system("cat %s" % os.path.join(recipesDir, recipesList[recNum][1]))
[docs]def uninstall(): """Uninstall the p4 package.""" print(""" This function is for uninstalling an installed version of p4. It uses the p4.installation module to get the locations of files. It may need to be done as root, or using sudo.""") weAreInteractive = False if os.getenv('PYTHONINSPECT'): # The p4 script sets this weAreInteractive = True elif len(sys.argv) == 1 and sys.argv[0] == '': # Command 'p4' with no args weAreInteractive = True # How can I tell if python was invoked with the -i flag? if not weAreInteractive: return try: import p4.installation except ImportError: raise P4Error("Unable to import the p4.installation module.") print(""" This function will remove the p4 script file %s the library files in the directory %s and the documentation in the directory %s """ % (installation.p4ScriptPath, installation.p4LibDir, installation.p4DocDir)) ret = input('Ok to do this? [y/n]') ret = ret.lower() if ret not in ['y', 'yes']: return print("Ok, deleting ...") if os.path.exists(installation.p4LibDir): os.system("rm -fr %s" % installation.p4LibDir) else: raise P4Error("Could not find %s" % installation.p4LibDir) if os.path.exists(installation.p4DocDir): os.system("rm -fr %s" % installation.p4DocDir) else: raise P4Error("Could not find %s" % installation.p4DocDir) if os.path.exists(installation.p4ScriptPath): os.system("rm %s" % installation.p4ScriptPath) else: raise P4Error("Could not find %s" % installation.p4ScriptPath)
########################################################### # Tkinter stuff ########################################################### # def startTkThread(): # if var.tk_thread_running: # print "Tk thread is already running, it appears." # return #### import thread #### import atexit #### from Queue import Queue #### var.tk_request = Queue(0) #### var.tk_result = Queue(1) # thread.start_new_thread(_tk_thread,()) #### var.tk_thread_running = True # atexit.register(_tkShutdown) # def _tk_thread(): ## import Tkinter # print "_tk_thread() here!" ## var.tk_root = Tkinter.Tk() # print "var.tk_root is %s" % var.tk_root # var.tk_root.withdraw() ## var.tk_root.after(var.tk_pollInterval, _tk_pump) # var.tk_root.mainloop() # def _tk_pump(): # global _thread_running # while not var.tk_request.empty(): ## command,returns_value = var.tk_request.get() # try: ## result = command() # if returns_value: # var.tk_result.put(result) # except: ## var.tk_thread_running = False # if returns_value: # var.tk_result.put(None) # release client # raise # re-raise the exception -- kills the thread # if var.tk_thread_running: ## var.tk_root.after(var.tk_pollInterval, _tk_pump) # def _tkShutdown(): # shutdown the tk thread # global _thread_running # _tkExec(sys.exit) ## var.tk_thread_running = False # time.sleep(.5) # give tk thread time to quit ############################################################## ############################################################## ##############################################################
[docs]def spaceDelimitedToTabDelimited(fName, outFName=None): """Convert space-delimited data files to tab-delimited. The outfilename by default is the infilename with .tabbed stuck on the end. """ assert os.path.isfile(fName) if outFName: oFName = outFName else: oFName = "%s.tabbed" % fName f = open(fName) ll = f.readlines() f.close() f = open(oFName, 'w') for l in ll: sl = l.split() jl = '\t'.join(sl) f.write("%s\n" % jl) f.close()
[docs]def uniqueFile(file_name): """Returns an open file and filename, modified from file_name. If the file_name is foo.bar, then the new filename will start with foo and end with .bar, with a bit of unique nonsense in between. With a simple file_name input the new file is made in current directory, but by supplying a file_name including a path, you can create the new file elsewhere. Don't forget to close the file. """ # I got this from the web somewhere. import tempfile dirname, filename = os.path.split(file_name) prefix, suffix = os.path.splitext(filename) fd, filename = tempfile.mkstemp(suffix, prefix + "_", dirname) return os.fdopen(fd, 'w'), filename
[docs]def writeInColour(theString, colour='blue'): goodColours = [ 'red', 'RED', 'blue', 'BLUE', 'cyan', 'CYAN', 'violet', 'VIOLET'] if colour not in goodColours: raise P4Error( "func.printColour(). The colour should be one of %s" % goodColours) codeDict = { 'red': '\033[0;31m', 'RED': '\033[1;31m', 'blue': '\033[0;34m', 'BLUE': '\033[1;34m', 'cyan': '\033[0;36m', 'CYAN': '\033[1;36m', 'violet': '\033[0;35m', 'VIOLET': '\033[1;35m', } backToBlackCode = '\033[m' sys.stdout.write("%s%s%s" % (codeDict[colour], theString, backToBlackCode))
[docs]def setTerminalColour(theColour): goodTerminalColours = [ 'red', 'RED', 'blue', 'BLUE', 'cyan', 'CYAN', 'violet', 'VIOLET'] terminalColourCodeDict = { 'red': '\033[0;31m', 'RED': '\033[1;31m', 'blue': '\033[0;34m', 'BLUE': '\033[1;34m', 'cyan': '\033[0;36m', 'CYAN': '\033[1;36m', 'violet': '\033[0;35m', 'VIOLET': '\033[1;35m', } #self.terminalBackToBlackCode = '\033[m' assert theColour in goodTerminalColours, "The colour must be one of %s" % goodTerminalColours sys.stdout.write(terminalColourCodeDict[theColour])
[docs]def unsetTerminalColour(): sys.stdout.write('\033[m')
# Color and colour. writeInColor = writeInColour setTerminalColor = setTerminalColour unsetTerminalColor = unsetTerminalColour def _sumOfSquares(seq): """Pure Python. Converts to floats, returns a float.""" if not seq: return 0.0 mySum = 0.0 for it in seq: mySum += (float(it) * float(it)) return mySum
[docs]def sortListOfObjectsOnAttribute(aListOfObjects, attributeString): """Returns a new sorted list.""" # Repaired for py3. # In the original (that worked with py2 all these years) used pairs, where, # 'paired' was a list of 2-element tuples, (thingToSortOn, theOriginalObject) # The problem in py3 is in tie scores in thingToSortOn, where sort() will # then attempt to compare objects, and that will generally fail, cuz objA < # objB (in Py3) will throw a # TypeError: '<' not supported between instances of 'Foo' and 'Foo' # So I simply add a third element, the indx in the middle, that allows # reliable sorting when there is a tie score at the first position. def tripling(anObject, indx, a=attributeString): return (getattr(anObject, a), indx, anObject) triplets = [] for indx, ob in enumerate(aListOfObjects): triplets.append(tripling(ob, indx, a=attributeString)) #print(triplets) triplets.sort() def stripit(aTriplet): return aTriplet[2] return [stripit(tr) for tr in triplets]
[docs]def sortListOfObjectsOn2Attributes(aListOfObjects, attributeString1, attributeString2): """Returns a new sorted list.""" # Fixed for py3 as in sortListOfObjectsOnAttribute, adding a 4th element def quadding(anObject, indx, a=attributeString1, b=attributeString2): return (getattr(anObject, a), getattr(anObject, b), indx, anObject) quads = [] for indx, ob in enumerate(aListOfObjects): quads.append(quadding(ob, indx, a=attributeString1, b=attributeString2)) #print(quads) quads.sort() def stripit(quad): return quad[3] return [stripit(qu) for qu in quads]
[docs]def sortListOfListsOnListElementNumber(aListOfLists, elementNumber): """Returns a new sorted list.""" def pairing(aList, e=elementNumber): return (aList[e], aList) paired = [pairing(aList) for aList in aListOfLists] # 'paired' is a list of 2-element tuples, (thingToSortOn, theOriginalElement) # print "paired = ", paired paired.sort() def stripit(pair): return pair[1] return [stripit(pr) for pr in paired]
[docs]def readAndPop(stuff): """Read in simple stuff, pop the single object from var lists, and return it. The stuff to be read in must be convertible to a single object, one of Alignment, SequenceList, or Tree. When that is read, the stuff as usual goes into one of var.alignments, var.sequenceLists, or var.trees. The single object is popped from where it ends up, and returned. """ gm = ['func.readAndPop()'] #assert os.path.isfile(fName) onSL = len(var.sequenceLists) onAlig = len(var.alignments) onTrees = len(var.trees) read(stuff) nnSL = len(var.sequenceLists) - onSL nnAlig = len(var.alignments) - onAlig nnTrees = len(var.trees) - onTrees mySum = nnSL + nnAlig + nnTrees if mySum != 1: if mySum < 1: gm.append("no appropriate objects were made.") else: gm.append("Got %i objects. Only 1 allowed." % mySum) raise P4Error(gm) if nnSL: return var.sequenceLists.pop() elif nnAlig: return var.alignments.pop() elif nnTrees: return var.trees.pop()
########################################################################## ########################################################################## ##########################################################################
[docs]def charsets(names, lens, fName=None): """Write a nice NEXUS sets block, given partition names and lengths. For example:: geneNames = 'coi cytb nad5'.split() lens = [] for geneName in geneNames: read('%s.nex' % geneName) lens.append(var.alignments[-1].nChar) func.charsets(geneNames, lens) which writes:: #nexus begin sets; charset coi = 1 - 131; [nChar = 131] charset cytb = 132 - 352; [nChar = 221] charset nad5 = 353 - 521; [nChar = 169] charpartition p1 = coi:coi, cytb:cytb, nad5:nad5 ; [partition p1 = 3:coi, cytb, nad5;] end; """ gm = ['func.charsets()'] if len(names) != len(lens): gm.append("len of names (%i) is not the same as the len of lengths (%i)" % ( len(names), len(lens))) print("names: ", names) print("lens: ", lens) raise P4Error(gm) start = 1 if fName: f = open(fName, 'w') else: f = sys.stdout f.write("#nexus\n\n") f.write("begin sets;\n") for cNum in range(len(names)): f.write(" charset %s = %i - %i;" % (names[cNum], start, start - 1 + lens[cNum])) f.write(" [nChar = %i]\n" % lens[cNum]) start += lens[cNum] f.write(" charpartition p1 = ") pp = ["%s:%s" % (nm, nm) for nm in names] f.write(', '.join(pp)) f.write(';\n') f.write(" [partition p1 = %i:" % len(names)) f.write(', '.join(names)) f.write(';]\n') f.write("end;\n") if fName: f.close()
[docs]def reseedCRandomizer(newSeed): """Set a new seed for the c-language random() function. For those things in the C-language that use random(), this re-seeds the randomizer. Reseed to different integers to make duplicate runs of something come out different. This is not for the GSL random stuff. Confusing to have 2 systems, innit? And then there is the 3rd system that Python uses in the random module. Sorry! If you wanted for example to repeat an MCMC, there are four random number generators to contend with. You could do something like this (where I am using a seed of zero for all -- you could use your own seed, and they do not need to be the same):: # for C random() function, used in Brent-Powell optimization func.reseedCRandomizer(0) # for gsl, used in simulations (amongst other places) var.gsl_rng = pf.gsl_rng_get() pf.gsl_rng_set(var.gsl_rng, 0) # for the Python random library, used a lot in python code random.seed(0) # for Numpy and Scipy. Used in func.dirichlet2() numpy.random.seed(0) """ pf.reseedCRandomizer(newSeed)
[docs]def gsl_meanVariance(seq, mean=None, variance=None): """Use gsl to compute both the mean and variance. Arg seq can be a list or a numpy array. Returns a 2-tuple of single-item NumPy arrays. To save a little time, you can pass the mean and variance to this function, in the form of zero-dimensional, single item NumPy arrays (eg mean = numpy.array(0.0)) The numpy built-in variance function does not use n-1 weighting. This one (from gsl) does use n-1 weighting. """ if isinstance(seq, numpy.ndarray): mySeq = seq else: mySeq = numpy.array(seq, numpy.float) if mean is None: mean = numpy.array([0.0]) if variance is None: variance = numpy.array([0.0]) pf.gsl_meanVariance(mySeq, len(seq), mean, variance) if 0: print("slow p4: mean=%f, variance=%f" % (func.mean(list(seq)), func.variance(list(seq)))) print("gsl: mean=%f, variance=%f" % (mean, variance)) # different than gsl-- no n-1 weighting. print("numpy: mean=%f, variance=%f" % (mySeq.mean(), mySeq.var())) return (mean, variance)
[docs]def chiSquaredProb(xSquared, dof): """Returns the probability of observing X^2.""" return pf.chiSquaredProb(xSquared, dof)
[docs]def gsl_ran_gamma(a, b, seed=None): """See also random.gammavariate()""" complaintHead = '\nfunc.gsl_ran_gamma()' gm = complaintHead try: a = float(a) b = float(b) except: gm.append("Both a and b should be floats.") raise P4Error(gm) isNewGSL_RNG = 0 if not var.gsl_rng: var.gsl_rng = pf.gsl_rng_get() isNewGSL_RNG = 1 # print "got var.gsl_rng = %i" % var.gsl_rng # sys.exit() # Set the GSL random number generator seed, only if it is a new GSL_RNG if isNewGSL_RNG: if seed != None: try: newSeed = int(seed) pf.gsl_rng_set(var.gsl_rng, newSeed) except ValueError: print(complaintHead) print(" The seed should be convertible to an integer") print(" Using the process id instead.") pf.gsl_rng_set(var.gsl_rng, os.getpid()) else: pf.gsl_rng_set(var.gsl_rng, os.getpid()) return pf.gsl_ran_gamma(var.gsl_rng, a, b)
[docs]def dirichlet2(inSeq, outSeq, alpha, theMin): """Modify inSeq with a draw from a Dirichlet distribution with a single alpha value. Args *inSeq* and *outSeq* are both numpy arrays. Arg *inSeq* is not modified itself; the modification is placed in *outSeq* (and nothing is returned). The result is normalized to 1.0 This is about 20% slower than :func:`func.dirichlet1` -- not quite sure why. """ gm = ['func.dirichlet2()'] assert isinstance(inSeq, numpy.ndarray) assert isinstance(outSeq, numpy.ndarray) kk = len(inSeq) theMax = 1.0 - ((kk - 1) * theMin) safety = 0 safetyLimit = 300 while 1: for i in range(kk): #theSeq[i] = pf.gsl_ran_gamma(var.gsl_rng, theSeq[i] * alpha, 1.0) # outSeq[i] = random.gammavariate(inSeq[i] * alpha, 1.0) --- this # is very slow! outSeq[i] = numpy.random.gamma(inSeq[i] * alpha) #outSeq[i] = inSeq[i] * numpy.random.gamma(alpha) outSeq /= outSeq.sum() thisMin = numpy.amin(outSeq) thisMax = numpy.amax(outSeq) isOk = True if (thisMin < theMin) or (thisMax > theMax): isOk = False if isOk: return safety += 1 if safety > safetyLimit: gm.append( "Tried more than %i times to get good dirichlet values, and failed. Giving up." % safetyLimit) raise P4Error(gm)
[docs]def gsl_ran_dirichlet(alpha, theta): """Make a random draw from a dirichlet distribution. Args: alpha and theta (numpy arrays): both the same length (more than 1). The length is the dimension of the dirichlet. The contents of *theta* are over-written (without being used). The draw ends up in *theta*. It is normalized so that it sums to 1.0. """ complaintHead = '\nfunc.gsl_ran_dirichlet()' gm = complaintHead try: assert isinstance(alpha, numpy.ndarray) assert isinstance(theta, numpy.ndarray) except AssertionError: gm.append(" alpha, theta, should be numpy arrays.") raise P4Error(gm) assert len(alpha) > 1 assert len(theta) == len(alpha) if not var.gsl_rng: var.gsl_rng = pf.gsl_rng_get() pf.gsl_rng_set(var.gsl_rng, int(time.time())) pf.gsl_ran_dirichlet(var.gsl_rng, len(theta), alpha, theta)
[docs]def studentsTTest1(seq, mu=0.0, verbose=True): """Test whether a sample differs from mu. From wikipedia. Arg 'seq' is a list of numbers. Internally it is converted to a numpy array of floats, so the input seq need not be floats, and need not be a numpy array, although it does not hurt to be either. Arg 'mu' is by default zero. Returns the p-value. """ sq = numpy.array(seq, dtype=numpy.float) m, v = gsl_meanVariance(sq) s = numpy.sqrt(v) n = len(sq) sqN = numpy.sqrt(n) t = (m - mu) / (s / sqN) dof = n - 1 p = pf.studentsTProb(t, dof) if verbose: print("mean =", m) print("std dev =", s) print("t statistic =", t) print("prob =", p) return p
[docs]def effectiveSampleSize(data, mean): """As done in Tracer v1.4, by Drummond and Rambaut. Thanks guys! But see :func:`p4.func.summarizeMcmcPrams`, which gives ESSs. """ nSamples = len(data) maxLag = int(nSamples / 3) if maxLag > 1000: maxLag = 1000 gammaStatAtPreviousLag = numpy.array([0.0]) assert isinstance(data, numpy.ndarray), "Arg 'data' should be a numpy.array. Its %s" % type(data) assert isinstance(mean, numpy.ndarray), "Arg 'mean' should be a numpy.array. Its %s" % type(mean) gammaStat = numpy.array([0.0]) varStat = numpy.array([0.0]) gammaStatAtLagZero = numpy.array([0.0]) if 0: lag = 0 while lag < maxLag: gammaStat[0] = 0.0 for j in range(nSamples - lag): gammaStat[0] += (data[j] - mean) * (data[j + lag] - mean) # if lag == 0: # print "mean is %f" % mean # print "lag is 0, gammaStat = %f" % gammaStat[0] gammaStat[0] /= (nSamples - lag) if lag == 0: varStat[0] = gammaStat gammaStatAtLagZero[0] = gammaStat # print "got gammaStatAtLagZero = %f" % gammaStatAtLagZero[0] elif (lag % 2) == 0: if gammaStatAtPreviousLag + gammaStat > 0: varStat[0] += 2.0 * (gammaStatAtPreviousLag + gammaStat) else: break lag += 1 gammaStatAtPreviousLag[0] = gammaStat #gammaStat[0] = 0.0 # print gammaStatAtLagZero, gammaStat, varStat, lag # print "maxLag is %i" % maxLag # stdErrorOfMean = numpy.sqrt(varStat / nSamples) ??!? #ACT = stepSize * varStat / gammaStatAtLagZero #ESS = (stepSize * nSamples) / ACT; # stepSize is not needed ESS1 = nSamples * (gammaStatAtLagZero / varStat) # print "got ESS1 %f" % ESS1 if 1: pf.effectiveSampleSize(data, mean, nSamples, maxLag, gammaStatAtPreviousLag, gammaStat, varStat, gammaStatAtLagZero) ESS2 = nSamples * (gammaStatAtLagZero / varStat) #fabsDiff = numpy.fabs(ESS1 - ESS2) # print "ESS1 is %f, ESS2 is %f, diff is %g" % (ESS1, ESS2, fabsDiff) return ESS2[0]
[docs]def summarizeMcmcPrams(skip=0, run=-1, theDir='.', makeDict=False): """Find the mean, variance, and ess of mcmc parameters. Ess is effective sample size, as in Tracer by Drummond and Rambaut. The numbers are found in mcmc_prams_N, N=0, 1, etc. If arg 'run' is set to -1, the default, then all runs are done. Alternatively you can set the run to a specific run number, and that is the only one that is done. The 'profile', with the names of the parameters, and the number of each, is found in mcmc_pramsProfile_N.py (N=0,1,2 ...). It is not essential, but it gives names to the parameters. """ gm = ["func.summarizeMcmcPrams()"] nPrams = None pramsProfile = None numsList = None if run == -1: runNum = 0 # read in the prams profile only once. Assume it will apply to all runs. try: loc = {} theFName = "mcmc_pramsProfile_%i.py" % runNum exec(open(os.path.join(theDir, theFName)).read(), {}, loc) # loc =locals() no workee. # print "loc = %s" % loc nPrams = loc['nPrams'] pramsProfile = loc['pramsProfile'] except IOError: print("The file '%s' cannot be found." % theFName) if makeDict: print("Cannot make dictionary without %s" % theFName) return None else: runNum = run totalLinesRead = 0 while 1: if run != -1: # possibly different prams profiles for each run try: loc = {} theFName = "mcmc_pramsProfile_%i.py" % runNum exec(open(os.path.join(theDir, theFName)).read(), {}, loc) # loc =locals() no workee. # print "loc = %s" % loc nPrams = loc['nPrams'] pramsProfile = loc['pramsProfile'] except IOError: print("The file '%s' cannot be found." % theFName) if makeDict: print("Cannot make dictionary without %s" % theFName) return None try: theFName = os.path.join(theDir, "mcmc_prams_%i" % runNum) flob = open(theFName) if not makeDict: print("Reading prams from file %s" % theFName) except IOError: break theLines = flob.readlines() flob.close() runNum += 1 skipsDone = 0 linesRead = 0 for aLine in theLines: ll = aLine.lstrip() if ll.startswith("#"): pass elif not ll: pass elif ll[0] not in string.digits: pass else: if skipsDone < skip: skipsDone += 1 else: splitLine = aLine.split() # If it does not exist yet, then make it now. if not numsList: thisNPrams = len(splitLine) - 1 if nPrams: if not thisNPrams == nPrams: gm.append( "thisNPrams = %i, nPrams = %s" % (thisNPrams, nPrams)) raise P4Error(gm) else: nPrams = thisNPrams numsList = [] for pramNum in range(nPrams): numsList.append([]) for pramNum in range(nPrams): try: theOne = splitLine[pramNum + 1] except IndexError: gm.append("Line '%s'. " % aLine.rstrip()) gm.append( "Can't get parameter number %i " % pramNum) raise P4Error(gm) try: aFloat = float(theOne) numsList[pramNum].append(aFloat) except (ValueError, TypeError): gm.append("Can't make sense of '%s'" % theOne) raise P4Error(gm) linesRead += 1 if not makeDict: print(" skipped %i lines" % skipsDone) print(" read %i lines" % linesRead) totalLinesRead += linesRead if run != -1: break if not makeDict: print("Read %i pram lines in total." % totalLinesRead) # print numsList spacer1 = ' ' * 20 pramsdict = {} if pramsProfile: if not makeDict: print("%s %26s mean variance ess " % (spacer1, ' ')) print("%s %26s -------- -------- --------" % (spacer1, ' ')) pramCounter = 0 for partNum in range(len(pramsProfile)): if len(pramsProfile) > 1: if not makeDict: print("Data partition %i" % partNum) if len(pramsProfile[partNum]): pramsdict["part%i" % partNum] = [] # print pramsProfile[partNum] for pramNum in range(len(pramsProfile[partNum])): pString = pramsProfile[partNum][pramNum][0] pramCounts = pramsProfile[partNum][pramNum][1] for p in range(pramCounts): if not makeDict: print("%s%3i %22s[%2i] " % (spacer1, pramCounter, pString, p), end=' ') d = numpy.array(numsList[pramCounter], numpy.float) m, v = gsl_meanVariance(d) ess = effectiveSampleSize(d, m) stats = [] stats.append("%s[%i]" % (pString, p)) if m == 0.0: if not makeDict: print(" 0.0 ", end=' ') else: stats.append("0.0") elif m < 0.00001: if not makeDict: print("%10.3g " % m, end=' ') else: stats.append("%.3g" % m) elif m < 1.0: if not makeDict: print("%10.6f " % m, end=' ') else: stats.append("%.6f" % m) else: if not makeDict: print("%10.4f " % m, end=' ') else: stats.append("%.4f" % m) if v == 0.0: if not makeDict: print(" 0.0 ", end=' ') else: stats.append("0.0") elif v < 0.000001: if not makeDict: print("%10.3g " % v, end=' ') else: stats.append("%.3g" % v) elif v < 1.0: if not makeDict: print("%10.6f " % v, end=' ') else: stats.append("%.6f " % v) else: if not makeDict: print("%10.4f " % v, end=' ') else: stats.append("%.6f" % v) if not makeDict: print("%10.1f " % ess, end=' ') else: stats.append("%.1f" % ess) if not makeDict: print() pramCounter += 1 pramsdict["part%i" % partNum].append(stats) else: if not makeDict: print(" No parameters in this data partition.") else: # no pramsProfile print("%9s mean variance ess " % ' ') print("%9s-------- -------- --------" % ' ') for pramNum in range(nPrams): print(" %2i " % pramNum, end=' ') d = numpy.array(numsList[pramNum], numpy.float) m, v = gsl_meanVariance(d) ess = effectiveSampleSize(d, m) if m == 0.0: print(" 0.0 ", end=' ') elif m < 0.00001: print("%10.3g " % m, end=' ') elif m < 1.0: print("%10.6f " % m, end=' ') else: print("%10.4f " % m, end=' ') if v == 0.0: print(" 0.0 ", end=' ') elif v < 0.000001: print("%10.3g " % v, end=' ') elif v < 1.0: print("%10.6f " % v, end=' ') else: print("%10.4f " % v, end=' ') print("%10.1f " % ess, end=' ') print() if makeDict: return pramsdict
[docs]def newtonRaftery94_eqn16(logLikes, delta=0.1, verbose=False): """Importance sampling, as in Newton and Raftery 1994, equation 16""" lla = numpy.array(logLikes) # Start the iteration with the harm mean, so calculate it. # This first way is the way from Susko I think it was, via Jessica # Leigh. It is not very good. But it probably does not matter, # as it is only a starting point for an iteration. if 1: theMax = -numpy.min(lla) diff = 700. - theMax shifted = (-lla) + diff expd = numpy.exp(shifted) theSum = numpy.sum(expd) theHarmMean = float(-(numpy.log(theSum) - diff)) # print theHarmMean # print type(theHarmMean) return pf.newtonRaftery94_eqn16(lla, len(logLikes), theHarmMean, delta, int(verbose))
#n = Numbers(logLikes) # theHarmMean = n.harmonicMeanOfLogs() # better # return pf.newtonRaftery94_eqn16(lla, len(logLikes), theHarmMean, delta, # int(verbose))
[docs]def readJplace(fName, verbose=False): """Read ``*.jplace`` files for phylogenetic placement of short reads It returns a tuple --- (tree, heatSum). The tree is decorated with a 'heat' attribute on each node.br. The heatSum is the sum of all the heats. """ import json assert fName.endswith("jplace"), "Is %s a jplace file? --- it does not end with '.jplace'" % fName # read with json flob = open(fName) jobj = json.load(flob) flob.close() # Read the tree inBraces = False tLst2 = [] tStr = jobj['tree'] #print(tStr) tLst = list(tStr) for c in tLst: if inBraces: if c == '}': inBraces = False else: if c == '{': inBraces = True else: tLst2.append(c) tStr = ''.join(tLst2) #print(tStr) t = readAndPop(tStr) if verbose: print("Got tree with %i nodes" % len(t.nodes)) # JPlace files use postorder node numbering. P4 uses preorder. # Po is PostOrder. nodeForPoNumDict = {} for poNum, nNum in enumerate(t.postOrder): #print("post-order =", poNum, "node num = ",nNum) nodeForPoNumDict[poNum] = t.nodes[nNum] placements = jobj['placements'] #n_p is the number of placement lines n_p = len(placements) if verbose: print("There are %i placement lines (n_p)" % n_p) pResults = [0.0] * len(t.nodes) for pLine in placements: #print(type(pLine)) # a dict nn = pLine['n'] assert len(nn) == 1 pp = pLine['p'] pLine['S_lwr'] = sum([it[2] for it in pp]) #print("Got sum of lwr's %f" % pLine['S_lwr']) for pNode in pp: nNum = pNode[0] contrib = pNode[2] / pLine['S_lwr'] pResults[nNum] += contrib heatSum = 0.0 for poNum, pR in enumerate(pResults): n = nodeForPoNumDict[poNum] if n == t.root: assert math.fabs(pR) < 1.e-14, "Something wrong. Root has placement result %g" % pR else: n.br.heat = pR if verbose: print("Got node.br 'heat'", pR) heatSum += pR if verbose: print("Sum of heats", heatSum) print("returning a tuple of the heat-decorated Tree from %s, and heatSum..." % fName) return (t, heatSum)
[docs]def matrixLowerTriangleToUpperTriangle(ltList, dim): """Rearrange matrix from a lower triangle to an upper triangle list Args: ltList (list): a lower triangle of a matrix in the form of a list dim (int): the dimension of the (square) matrix Returns: the same items rearranged as the upper triangle, in the form of a list. If your matrix is like this:: - - - - A - - - B D - - C E F - where the dim is 4, then the lower triangle is the list [A,B,D,C,E,F]. This function rearranges that so that it is the upper triangle:: - A B C - - D E - - - F - - - - and returns the list [A,B,C,D,E,F]. This is useful for user-specified empirical protein rate matrices where the rate matrix is given as a PAML-style lower triangle, but you need a p4-style upper triangle:: pamlStyleRates = [190 rates] upTriangle = func.matrixLowerTriangleToUpperTriangle(pamlStyleRates, 20) # then when you specify your model ... t.newRMatrix(free=0, spec='specified', val=upTriangle) """ assert isinstance(dim, int) expectedLen = int(((dim * dim) - dim)/2) assert len(ltList) == expectedLen, f"Lower triangle len is {len(ltList)}, expected {expectedLen}" bigM = [] for rNum in range(dim): bigM.append(['0.0'] * dim) i = 0 for rNum in range(1,dim): for cNum in range(rNum): #print(ltList[i]) bigM[cNum][rNum] = ltList[i] i += 1 upper = [] for rNum in range(dim-1): for cNum in range(rNum+1, dim): #print(rNum, cNum, bigM[rNum][cNum]) upper.append(bigM[rNum][cNum]) return upper
def _compareSplitsBetweenTwoTreePartitions(tp1, tp2, minimumProportion, verbose=False): """Returns a tuple of asdoss, maximum of the differences and mean of the differences This calls the method TreePartitions.compareSplits(), and digests the results returned from that. Args: tp1, tp2 (TreePartition): TreePartition objects minimumProportion (float): passed to TreePartitions.compareSplits() Returns: (asdoss, maxOfDiffs, meanOfDiffs) """ ret = tp1.compareSplits(tp2, minimumProportion=minimumProportion) #print(ret) # a list of 3-item lists # 1. The split key # 2. The split string # 3. A list of the 2 supports if not ret: return None sumOfStdDevs = 0.0 nSplits = len(ret) diffs = [] for i in ret: # print " %.3f %.3f " % (i[2][0], i[2][1]), stdDev = math.sqrt(p4.func.variance(i[2])) # print "%.5f" % stdDev sumOfStdDevs += stdDev diffs.append(math.fabs(i[2][0] - i[2][1])) asdoss = sumOfStdDevs / nSplits maxOfDiffs = max(diffs) meanOfDiffs = sum(diffs)/nSplits if verbose: print(" nSplits=%i, average of std devs of split supports %.4f " % (nSplits, asdoss)) print(" max of differences %f, mean of differences %f" % (maxOfDiffs, meanOfDiffs)) return (asdoss, maxOfDiffs, meanOfDiffs)
[docs]def compareSplitsBetweenTreePartitions(treePartitionsList, precision=3, linewidth=120): """Pairwise ASDOSS (ASDSF) and MaxDiff calculations Output is verbose. Shows - average standard deviation of split frequencies (or supports), like MrBayes - maximum difference between split supports from each pair of checkpoints, like PhyloBayes Returns: None """ nM = len(treePartitionsList) nItems = int(((nM * nM) - nM) / 2) asdosses = numpy.zeros((nM, nM), dtype=numpy.float64) vect = numpy.zeros(nItems, dtype=numpy.float64) mdvect = numpy.zeros(nItems, dtype=numpy.float64) maxDiffs = numpy.zeros((nM, nM), dtype=numpy.float64) minimumProportion=0.1 vCounter = 0 for mNum1 in range(1, nM): tp1 = treePartitionsList[mNum1] for mNum2 in range(mNum1): tp2 = treePartitionsList[mNum2] thisAsdoss, thisMaxDiff, thisMeanDiff = p4.func._compareSplitsBetweenTwoTreePartitions( tp1, tp2, minimumProportion, verbose=False) #if thisAsdoss == None and verbose: # print("No splits > %s" % minimumProportion) if thisAsdoss == None: thisAsdoss = 0.0 asdosses[mNum1][mNum2] = thisAsdoss asdosses[mNum2][mNum1] = thisAsdoss vect[vCounter] = thisAsdoss maxDiffs[mNum1][mNum2] = thisMaxDiff maxDiffs[mNum2][mNum1] = thisMaxDiff mdvect[vCounter] = thisMaxDiff vCounter += 1 if 0: print(" %10i " % mNum1, end=' ') print(" %10i " % mNum2, end=' ') print("%.3f" % thisAsdoss) # Save current numpy printoptions, and restore, below. curr = numpy.get_printoptions() numpy.set_printoptions(precision=precision, linewidth=linewidth) print("Pairwise average standard deviation of split frequency values ---") print(asdosses) print() print("For the %i values in one triangle," % nItems) print("max = %.3f" % vect.max()) print("min = %.3f" % vect.min()) print("mean = %.3f" % vect.mean()) #print("var = %.2g", vect.var()) print() print("Pairwise maximum differences in split supports between the runs ---") print(maxDiffs) print() print("For the %i values in one triangle," % nItems) print("max = %.3f" % mdvect.max()) print("min = %.3f" % mdvect.min()) print("mean = %.3f" % mdvect.mean()) #print("var = ", mdvect.var()) # Reset printoptions back to what it was numpy.set_printoptions( precision=curr['precision'], linewidth=curr['linewidth'])