Source code for p4.distancematrix

import sys
import os
import p4.func
from p4.var import var
from p4.p4exceptions import P4Error


[docs] class DistanceMatrix: """A container for distances between sequences (usually). The numbers are in self.matrix, a self.dim * self.dim list of lists. There is also a self.names attribute, which is usually for sequence names. """ def __init__(self): self.dim = None self.names = None self.matrix = None self.message = None
[docs] def setDim(self, dim): # print "setDim here" self.dim = dim self.matrix = [] for i in range(dim): l = [0.0] * dim self.matrix.append(l)
[docs] def flatTriangle(self, upper=True): """Get (default upper) triangle as a simple list If arg upper is off, the lower triangle is returned. """ fT = [] if upper: for row in range(0, self.dim - 1): for col in range(row + 1, self.dim): fT.append(self.matrix[row][col]) else: for row in range(1, self.dim): for col in range(0, row): fT.append(self.matrix[row][col]) return fT
[docs] def writeNexus(self, fName=None, writeTaxaBlock=1, append=0, digitsAfterDecimal=6): """Write out self in Nexus format. If writeTaxaBlock=1, then a Taxa block is written before the Distances block. Append, if 0, writes #NEXUS first. If 1, does not write #NEXUS. """ gm = ["DistanceMatrix.writeNexus()"] import string assert self.dim, "Distance Matrix.writeNexus() no dim" if fName == None or fName == sys.stdout: f = sys.stdout else: if append: import os if os.path.isfile(fName): try: f = open(fName, 'a') except IOError: gm.append("Can't open %s for appending." % fName) raise P4Error(gm) else: print(gm[0]) assert not os.path.lexists() print(" 'append' is requested,") print(" but '%s' is not a regular file (maybe it doesn't exist?)." % fName) print(" Writing to a new file instead.") try: f = open(fName, 'w') f.write('#NEXUS\n\n') except IOError: gm.append("Can't open %s for writing." % fName) raise P4Error(gm) else: try: f = open(fName, 'w') except IOError: gm.append("Can't open %s for writing." % fName) raise P4Error(gm) self.writeNexusToOpenFile( f, writeTaxaBlock, append, digitsAfterDecimal) if f != sys.stdout: f.close()
[docs] def writeNexusToOpenFile(self, flob, writeTaxaBlock, append, digitsAfterDecimal): gm = ["DistanceMatrix.writeNexusToOpenFile()"] import string assert self.dim, "Distance Matrix.writeNexusToOpenFile() no dim" f = flob if append: pass else: f.write('#NEXUS\n\n') if writeTaxaBlock: f.write('begin taxa;\n') f.write(' dimensions ntax=%s;\n' % self.dim) f.write(' taxlabels') for i in range(self.dim): f.write(' %s' % p4.func.nexusFixNameIfQuotesAreNeeded(self.names[i])) f.write(';\n') f.write('end;\n\n') f.write('begin distances;\n') if self.message: f.write("\n [%s\n ]\n" % self.message) f.write(' format triangle=both;\n') f.write(' matrix\n') # Make the format strings. if digitsAfterDecimal > 8: totWid = digitsAfterDecimal + 5 else: totWid = 11 #colNameFormat = '%' + '-%is' % totWid if digitsAfterDecimal == 0: numberFormat = ' %' + '%ii ' % (totWid - 4) elif digitsAfterDecimal > 0: numberFormat = '%' + '%i.%if' % (totWid, digitsAfterDecimal) else: gm.append("digitsAfterDecimal may not be below zero.") raise P4Error(gm) if self.names: longestNameLength = 0 for i in self.names: if len(i) > longestNameLength: longestNameLength = len(i) nameFormat = '%' + '%is ' % longestNameLength f.write(' [') f.write(nameFormat % ' ') for i in self.names: #f.write('%-10s' % i[:9]) colName = '%s' % i[:totWid - 1] f.write(colName.center(totWid)) f.write(']\n') for i in range(self.dim): if self.names: f.write(' ') f.write(nameFormat % p4.func.nexusFixNameIfQuotesAreNeeded(self.names[i])) for j in range(self.dim): #f.write('%10.6f' % self.matrix[i][j]) f.write(numberFormat % self.matrix[i][j]) f.write('\n') f.write(' ;\n') f.write('end;\n')
[docs] def writePhylip(self, fName=None, append=0, digitsAfterDecimal=6): """Write out self in Phylip format""" gm = ["DistanceMatrix.writePhylip()"] assert self.dim, "Distance Matrix.writePhylip() no dim" if fName == None or fName == sys.stdout: f = sys.stdout else: if append: import os if os.path.isfile(fName): try: f = open(fName, 'a') except IOError: gm.append("Can't open %s for appending." % fName) raise P4Error(gm) else: print(gm[0]) assert not os.path.lexists() print(" 'append' is requested,") print(" but '%s' is not a regular file (maybe it doesn't exist?)." % fName) print(" Writing to a new file instead.") try: f = open(fName, 'w') except IOError: gm.append("Can't open %s for writing." % fName) raise P4Error(gm) else: try: f = open(fName, 'w') except IOError: gm.append("Can't open %s for writing." % fName) raise P4Error(gm) self.writePhylipToOpenFile(f, digitsAfterDecimal) if f != sys.stdout: f.close()
[docs] def writePhylipToOpenFile(self, flob, digitsAfterDecimal): gm = ["DistanceMatrix.writePhylipToOpenFile()"] assert self.dim, "Distance Matrix.writePhylipToOpenFile() no dim" f = flob f.write('%i\n' % self.dim) # Make the format strings. if digitsAfterDecimal > 8: totWid = digitsAfterDecimal + 5 else: totWid = 11 #colNameFormat = '%' + '-%is' % totWid if digitsAfterDecimal == 0: numberFormat = ' %' + '%ii ' % (totWid - 4) elif digitsAfterDecimal > 0: numberFormat = '%' + '%i.%if' % (totWid, digitsAfterDecimal) else: gm.append("digitsAfterDecimal may not be below zero.") raise P4Error(gm) longestNameLength = 0 for i in self.names: if len(i) > longestNameLength: longestNameLength = len(i) nameFormat = '%' + '%is ' % longestNameLength for i in range(self.dim): if self.names: f.write(' ') f.write(nameFormat % p4.func.nexusFixNameIfQuotesAreNeeded(self.names[i])) for j in range(self.dim): #f.write('%10.6f' % self.matrix[i][j]) if self.matrix[i][j] != None: f.write(numberFormat % self.matrix[i][j]) else: noneFormat = '%' + '%is' % totWid f.write(noneFormat % None) f.write('\n') f.write('\n')
[docs] def readPhylipFile(self, theFileName): """Read a distance matrix in phylip format. """ gm = ['DistanceMatrix.readPhylipFile()'] import string self.names = [] f = open(theFileName, 'r') splitString = f.readline().split() if len(splitString) != 1: gm.append( "The first line should have the number of taxa, and thats all.") raise P4Error(gm) try: theDim = int(splitString[0]) except ValueError: gm.append("Could not get an integer from the first line.") raise P4Error(gm) self.setDim(theDim) for i in range(theDim): aLine = f.readline() if not aLine: gm.append("File too short?") raise P4Error(gm) splitLine = aLine.split() if len(splitLine) == 0: gm.append("Empty line?") gm.append("Got: '%s'" % aLine) raise P4Error(gm) if len(splitLine) <= 1: gm.append("Line too short.") gm.append("Got: '%s'" % aLine) raise P4Error(gm) self.names.append(splitLine[0]) # print "got name %s" % splitLine[0] j = 0 for k in splitLine[1:]: try: self.matrix[i][j] = float(k) # print "i=%i, j=%i, got dist %f" % (i, j, # self.matrix[i][j]) j = j + 1 if j >= theDim: break except ValueError: gm.append(" Could not convert %s to a float." % k) raise P4Error(gm) while j < theDim: aLine = f.readline() if not aLine: gm.append("File too short?") raise P4Error(gm) splitLine = aLine.split() if len(splitLine) == 0: gm.append("Empty line?") gm.append("Got: '%s'" % aLine) raise P4Error(gm) for k in splitLine: try: self.matrix[i][j] = float(k) # print "i=%i, j=%i, got dist %f" % (i, j, # self.matrix[i][j]) j = j + 1 if j >= theDim: break except ValueError: gm.append("Could not convert %s to a float." % k) raise P4Error(gm) f.close()
[docs] def njUsingPaup(self, paupPath='paup'): """Use paup to make a neighbor-joining tree, which is returned. The resulting tree is read in by p4, and is returned. We interact with paup by writing files, but care is taken that existing files are not overwritten, because new file names are made to be unique. If this does not work well, try setting the paupPath arg. """ gm = ["DistanceMatrix.njUsingPaup()"] #filename = sha.new(str(os.getpid())).hexdigest()[-10:] #dmFName = os.path.join(pathPrefix, "%s.dmat" % filename) #treeFName = os.path.join(pathPrefix, "%s.tree" % filename) #pFName = os.path.join(pathPrefix, "%s.cmds" % filename) #tempfile.mkstemp(suffix='', prefix='tmp', dir=None, text=False) # if pathPrefix: # theDir = pathPrefix # else: # theDir = None flob_dm, dmFName_fq = p4.func.uniqueFile('tmp.dm') # tempfile.mkstemp(suffix='tree', dir=theDir) flob_tf, treeFName_fq = p4.func.uniqueFile('tmp.tree') flob_tf.close() # tempfile.mkstemp(suffix='cmds', dir=theDir) flob_pf, pFName = p4.func.uniqueFile('tmp.cmds') # Throw the dir and dirname away. dirname, dmFName = os.path.split(dmFName_fq) dirname, treeFName = os.path.split(treeFName_fq) # Make the paup commands paupCommandString = """#nexus begin paup; execute %s; set crit=dist; dset negbrlen=setzero; nj; savetrees file=%s format=altnex brlens=yes taxablk=yes replace=yes; quit; end; """ % (dmFName, treeFName) # print paupCommandString # Write the files, do the analysis #writeNexusToOpenFile(self, flob, writeTaxaBlock, append, digitsAfterDecimal) self.writeNexusToOpenFile(flob_dm, True, False, 6) flob_dm.close() flob_pf.write(paupCommandString) flob_pf.close() os.system("%s -n %s > /dev/null" % (paupPath, pFName)) # This is the result. The tree, if it exists, is read in by p4. oldLen = len(var.trees) p4.func.read(treeFName) newLen = len(var.trees) if newLen == oldLen + 1: pass else: gm.append("I was expecting exactly one tree. Got %i" % (oldLen - newLen)) raise P4Error(gm) t = var.trees.pop() # Tidy up. os.remove(treeFName) os.remove(pFName) os.remove(dmFName) for n in t.iterNodesNoRoot(): if n.br.len < 0.0: n.br.len = 0.0 return t
[docs] def bionj(self): """Use bionj to make a neighbor-joining tree, which is returned. The resulting tree is read in by p4, and is returned. We interact with bionj by writing files, but care is taken that existing files are not overwritten, because new file names are made to be unique. If the branch lengths are less than zero, they are made to be zero. """ gm = ["DistanceMatrix.bionj()"] flob_dm, dmFName_fq = p4.func.uniqueFile('tmp.dm') # tempfile.mkstemp(suffix='tree', dir=theDir) flob_tf, treeFName_fq = p4.func.uniqueFile('tmp.tree') flob_tf.close() # Throw the dir and dirname away. dirname, dmFName = os.path.split(dmFName_fq) dirname, treeFName = os.path.split(treeFName_fq) # Write the files, do the analysis self.writePhylipToOpenFile(flob_dm, 6) flob_dm.close() os.system("bionj %s %s >/dev/null" % (dmFName, treeFName)) # This is the result. The tree, if it exists, is read in by p4. oldLen = len(var.trees) p4.func.read(treeFName) newLen = len(var.trees) if newLen == oldLen + 1: pass else: gm.append("I was expecting exactly one tree. Got %i" % (oldLen - newLen)) raise P4Error(gm) t = var.trees.pop() # Tidy up. os.remove(treeFName) os.remove(dmFName) for n in t.iterNodesNoRoot(): if n.br.len < 0.0: n.br.len = 0.0 return t
[docs] def fastme(self): """Use fastme to make a minimum-evolution tree, which is returned. The resulting tree is read in by p4, and is returned. We interact with fastme by writing files, but care is taken that existing files are not overwritten, because new file names are made to be unique. If the branch lengths are less than zero, they are made to be zero. """ gm = ["DistanceMatrix.fastme()"] flob_dm, dmFName_fq = p4.func.uniqueFile('tmp.dm') # tempfile.mkstemp(suffix='tree', dir=theDir) flob_tf, treeFName_fq = p4.func.uniqueFile('tmp.tree') flob_tf.close() # Throw the dir and dirname away. dirname, dmFName = os.path.split(dmFName_fq) dirname, treeFName = os.path.split(treeFName_fq) # Write the files, do the analysis self.writePhylipToOpenFile(flob_dm, 6) flob_dm.close() os.system("fastme -i %s -o %s" % (dmFName, treeFName)) # This is the result. The tree, if it exists, is read in by p4. oldLen = len(var.trees) p4.func.read(treeFName) newLen = len(var.trees) if newLen == oldLen + 1: pass else: gm.append("I was expecting exactly one tree. Got %i" % (oldLen - newLen)) raise P4Error(gm) t = var.trees.pop() # Tidy up. os.remove(treeFName) os.remove(dmFName) for n in t.iterNodesNoRoot(): if n.br.len < 0.0: n.br.len = 0.0 if not n.isLeaf: n.name = None return t