from p4.alignment import Alignment
import sys
import time
import os
import p4.pf as pf
import p4.func
from p4.var import var
from p4.p4exceptions import P4Error
[docs]class Data:
"""All the alignments that you want to work with, in one place.
Initialize this with one of
- nothing (or None),
- a list of Alignment objects, or
- a single Alignment object.
If you initialize with nothing (or None), then all alignments in
var.alignments are used. If you initialize with a list of
alignments, then that is used. You can initialize with an empty
list to get an empty Data object.
"""
# def __del__(self, freeData=pf.freeData, dp_freeData=pf.dp_freeData):
def __del__(self, freeData=pf.freeData):
if self.alignments:
for a in self.alignments:
a.parts = []
self.alignments = None
if self.parts:
# print len(self.parts)
for p in self.parts:
# if p.cPart:
# freePart(p.cPart) # this is not as good as p.__del__(), as
# it leaves some memory un-freed
p.__del__()
# else:
# print 0
if self.cData:
if self.doDataPart:
dp_freeData(self.cData)
else:
freeData(self.cData)
self.cData = None
self.parts = None
self.taxNames = None
def __init__(self, alignments=None):
gm = ['Data.__init__()']
self.nParts = 0
self.parts = []
self.alignments = []
self.nTax = 0
self.taxNames = []
self.cData = None
self.unconstrainedLogLikelihood = None
if alignments:
if isinstance(alignments, Alignment):
# Passed in a single alignment object not a list
alignments = [alignments]
else:
if not isinstance(alignments, list):
gm.append(
"The 'alignments' arg should be a list or a single Alignment object.")
raise P4Error(gm)
for a in alignments:
if isinstance(a, Alignment):
pass
else:
gm.append(
"Something in the 'alignments' arg was not an Alignment.")
raise P4Error(gm)
self._fill(alignments)
elif alignments == []:
pass
elif var.alignments:
self._fill(var.alignments)
# temporary! Only used in __del__()
self.doDataPart = var.doDataPart
[docs] def dump(self):
"""Print rubbish about self."""
print("Data dump")
if self.nParts == 1:
if var.doDataPart:
print(" There is 1 dataPart")
else:
print(" There is 1 part")
else:
if var.doDataPart:
print(" There are %i dataParts" % self.nParts)
else:
print(" There are %i parts" % self.nParts)
for p in self.parts:
print(" name=%s, nChar %i, dataType %s, cPart %s" % \
(p.name, p.nChar, p.dataType, p.cPart))
print(" There are %i taxa" % self.nTax)
if len(self.alignments) == 1:
print(" There is 1 alignment")
else:
print(" There are %i alignments" % len(self.alignments))
if self.cData:
print(" The cData is %s" % self.cData)
else:
print(" There is no cData")
if self.unconstrainedLogLikelihood:
print(" The unconstrainedLogLikelihood is %s" % self.unconstrainedLogLikelihood)
else:
pass
def _fill(self, alignments):
# Fill self with Parts from all alignments.
#
# This method is called from __init__(), and it is generally
# not needed on its own. If we get here, we can be fairly sure
# that arg alignments is a non-empty list of Alignment
# objects. This method calls the Alignment method _initParts()
gm = ["Data._fill()"]
self.alignments = alignments
# Make a part out of the first alignment.
if not len(self.alignments):
gm.append("There are no alignments")
raise P4Error(gm)
a = self.alignments[0]
if var.doDataPart:
a.initDataParts()
else:
a._initParts()
if not len(a.parts):
gm.append("First alignment failed to make a part")
raise P4Error(gm)
self.taxNames = a.taxNames
self.nTax = len(self.taxNames)
for p in a.parts:
self.parts.append(p)
self.nParts = len(self.parts)
# Now do subsequent alignments ...
for aligNum in range(len(self.alignments))[1:]:
a = self.alignments[aligNum]
if self.nTax != len(a.sequences):
gm.append(
"Additional alignment is not the same size as the first alignment.")
if a.fName:
gm.append('(New alignment from file %s.)' % a.fName)
gm.append("From the first alignment, nTax is %s." % self.nTax)
gm.append(
"However, (zero-based) alignment %i has %i sequences." % (aligNum, len(a.sequences)))
raise P4Error(gm)
if self.nTax != len(a.taxNames):
gm.append(
"Additional alignment appears to be not the same size as the first alignment.")
if a.fName:
gm.append('(New alignment from file %s.)' % a.fName)
gm.append("From the first alignment, nTax is %s." % self.nTax)
gm.append(
"However, (zero-based) alignment %i has %i taxNames." % (aligNum, len(a.taxNames)))
raise P4Error(gm)
for i in range(self.nTax):
if self.taxNames[i] != a.taxNames[i]:
gm.append(
"Name mis-match in (zero-based) taxon number %i," % i)
gm.append("in (zero-based) alignment %i." % aligNum)
if a.fName:
gm.append('(New alignment from file %s.)' % a.fName)
gm.append(
"Newly-added alignment taxname %s is not the" % a.taxNames[i])
gm.append(
" same as first alignment taxname %s" % self.taxNames[i])
raise P4Error(gm)
if var.doDataPart:
a.initDataParts()
else:
a._initParts()
if not len(a.parts):
gm.append("Additional alignment failed to make a part.")
if a.fName:
gm.append('(New alignment from file %s.)' % a.fName)
raise P4Error(gm)
for p in a.parts:
self.parts.append(p)
self.nParts = len(self.parts)
[docs] def calcUnconstrainedLogLikelihood1(self):
"""Calculate likelihood under the multinomial model.
This calculates the unconstrained (multinomial) log like
without regard to character partitions. The result is placed
in the data variable unconstrainedLogLikelihood. If there is
more than one partition, it makes a new temporary alignment
and puts all the sequences in one part in that alignment. So
it ultimately only works on one data partition. If there is
more than one alignment, there is possibly more than one
datatype, and so this method will refuse to do it. Note that
the unconstrained log like of the combined data is not the sum
of the unconstrained log likes of the separate partitions.
See also calcUnconstrainedLogLikelihood2
"""
if len(self.alignments) > 1:
gm = ["Data.calcUnconstrainedLogLikelihood()"]
gm.append(
"This method is not implemented for more than one alignment.")
raise P4Error(gm)
if self.nParts == 1: # no problem
self.unconstrainedLogLikelihood = pf.getUnconstrainedLogLike(
self.parts[0].cPart)
else:
a = self.alignments[0]
import copy
newAlig = Alignment()
newAlig.dataType = a.dataType
newAlig.symbols = a.symbols
newAlig.dim = a.dim
newAlig.equates = a.equates
newAlig.taxNames = a.taxNames
for s in a.sequences:
newAlig.sequences.append(copy.deepcopy(s))
newAlig.checkLengthsAndTypes()
newAlig._initParts()
# newAlig.dump()
self.unconstrainedLogLikelihood = pf.getUnconstrainedLogLike(
newAlig.parts[0].cPart)
del(newAlig)
[docs] def calcUnconstrainedLogLikelihood2(self):
"""Calculate likelihood under the multinomial model.
This calculates the unconstrained log like of each data
partition and places the sum in the Data (self) variable
unconstrainedLogLikelihood. Note that the unconstrained log
like of the combined data is not the sum of the unconstrained
log likes of the separate partitions. See also
calcUnconstrainedLogLikelihood1
"""
uncon = 0.0
for p in self.parts:
# print " %i %f" % (p.cPart,
# pf.getUnconstrainedLogLike(p.cPart))
uncon = uncon + pf.getUnconstrainedLogLike(p.cPart)
self.unconstrainedLogLikelihood = uncon
def _setCStuff(self):
if self.cData:
gm = ["Data._setCStuff()"]
gm.append(
"This should only be called if self.cData does not exist!")
raise P4Error(gm)
else:
if var.doDataPart:
self.cData = pf.dp_newData(self.nTax, self.nParts)
for i in range(self.nParts):
p = self.parts[i]
pf.dp_pokeDataPartInData(p.cPart, self.cData, i)
else:
self.cData = pf.newData(self.nTax, self.nParts)
for i in range(self.nParts):
p = self.parts[i]
pf.pokePartInData(p.cPart, self.cData, i)
# print "Made Data.cData = %s" % self.cData
[docs] def writeNexus(self, fName=None, writeDataBlock=0, interleave=0, flat=0, append=0):
"""Write all the alignments in self to a Nexus file.
If writeDataBlock=1, then taxa and characters are written to a
'data' block, rather than the default, which is to write
separate 'taxa' and 'characters' blocks.
Arg 'flat' gives sequences all on one line.
Arg 'append', if 0, writes #NEXUS first. If 1, does not write #NEXUS.
"""
# There may be more than one alignment, and we need to do the first
# one first, because it may or may not be appended, while the remaining
# alignments are appended for sure.
if len(self.alignments):
a = self.alignments[0]
# if a.parts and len(a.parts):
# a.resetSequencesFromParts() # simulate should be
# responsible for this
a.writeNexus(fName, writeDataBlock, interleave, flat, append)
for a in self.alignments[1:]:
# if a.parts and len(a.parts):
# a.resetSequencesFromParts()
a.writeNexus(fName, writeDataBlock, interleave, flat, append=1)
[docs] def resetSequencesFromParts(self):
for a in self.alignments:
if a.parts:
a.resetSequencesFromParts()
else:
raise P4Error("Alignment has no parts.")
[docs] def compoSummary(self):
"""A verbose composition summary, one for each data partition."""
print("\n\nData composition summary")
print("========================\n")
# Make a name format (eg '%12s') that is long enough for the longest
# name
longestNameLen = 7 # to start
for i in self.taxNames:
if len(i) > longestNameLen:
longestNameLen = len(i)
nameFormat = '%' + '%i' % (longestNameLen + 1) + 's'
for i in range(len(self.parts)):
p = self.parts[i]
print("Part %i" % i)
print("%s" % (' ' * (longestNameLen + 1)), end=' ')
for j in range(len(p.symbols)):
print("%10s" % p.symbols[j], end=' ')
print("%10s" % 'nSites')
# print ''
#cumulativeComps = [0.0] * len(p.symbols)
grandTotalNSites = 0
for k in range(p.nTax):
c = p.composition([k])
# print "tax %s, part.composition() returns %s" % (k, c)
nSites = pf.partSequenceSitesCount(p.cPart, k)
grandTotalNSites = grandTotalNSites + nSites
print(nameFormat % self.taxNames[k], end=' ')
# Usually sum(c) will be 1.0, unless the sequence is
# empty. We don't want to test "if sum(c) == 0.0:" or
# "if sum(c):" cuz of small numbers.
if sum(c) > 0.99:
for j in range(len(p.symbols)):
print("%10.4f" % c[j], end=' ')
#cumulativeComps[j] = cumulativeComps[j] + (c[j] * nSites)
else: # Empty sequence, all zeros. Write dashes.
for j in range(len(p.symbols)):
print("%10s" % '-', end=' ')
print("%10s" % nSites)
c = p.composition()
print(nameFormat % 'mean', end=' ')
for j in range(len(p.symbols)):
print("%10.4f" % c[j], end=' ')
# print "%10s" % grandTotalNSites
print("%10.4f" % (float(grandTotalNSites) / self.nTax))
print("\n")
[docs] def compoChiSquaredTest(self, verbose=1, skipColumnZeros=0, useConstantSites=1, skipTaxNums=None, getRows=0):
"""A chi square composition test for each data partition.
So you could do, for example::
read('myData.nex')
# Calling Data() with no args tells it to make a Data object
# using all the alignments in var.alignments
d = Data()
# Do the test. By default it is verbose, and prints results.
# Additionally, a list of lists is returned
ret = d.compoChiSquaredTest()
# With verbose on, it might print something like ---
# Part 0: Chi-square = 145.435278, (dof=170) P = 0.913995
print ret
# The list of lists that it returns might be something like ---
# [[145.43527849758556, 170, 0.91399521077908041]]
# which has the same numbers as above, with one
# inner list for each data partition.
If your data has more than one partition::
read('first.nex')
read('second.nex')
d = Data()
d.compoChiSquaredTest()
# Output something like ---
# Part 0: Chi-square = 200.870463, (dof=48) P = 0.000000
# Part 1: Chi-square = 57.794704, (dof=80) P = 0.971059
# [[200.87046313430443, 48, 0.0], [57.794704451018163, 80, 0.97105866938683427]]
where the last line is returned. With *verbose* turned off,
the ``Part N`` lines are not printed.
This method returns a list of lists, one for each data
partition. If *getRows* is off, the default, then it is a
list of 3-item lists, and if *getRows* is turned on then it is
a list of 4-item lists. In each inner list, the first is the
X-squared statistic, the second is the degrees of freedom, and
the third is the probability from chi-squared. (The expected
comes from the data.) If *getRows* is turned on, the 4th item
is a list of X-sq contributions from individual rows (ie
individual taxa), that together sum to the X-sq for the whole
partition as found in the first item. This latter way is the
way that Tree-Puzzle does it.
Note that this ostensibly tests whether the data are
homogeneous in composition, but it does not work on sequences
that are related. That is, testing whether the X^2 stat is
significant using the chi^2 curve has a high probability of
type II error for phylogenetic sequences.
However, the X-squared stat can be used in valid ways. You
can simulate data under the tree and model, and so generate a
valid null distribution of X^2 values from the simulations, by
which to assess the significance of the original X^2. You can
use this method to generate X^2 values.
A problem arises when a composition of a character is zero.
If that happens, we can't calculate X-squared because there
will be a division by zero. If *skipColumnZeros* is set to 1,
then those columns are simply skipped. They are silently
skipped unless verbose is turned on.
So lets say that your original data have all characters, but
one of them has a very low value. That is reflected in the
model, and when you do simulations based on the model you
occasionally get zeros for that character. Here it is up to
you: you could say that the the data containing the zeros are
validly part of the possibilities and so should be included,
or you could say that the data containing the zeros are not
valid and should be excluded. You choose between these by
setting *skipColumnZeros*. Note that if you do not set
*skipColumnZeros*, and then you analyse a partition that has
column zeros, the result is None for that partition.
Another problem occurs when a partition is completely missing
a sequence. Of course that sequence does not contribute to
the stat. However, in any simulations that you might do, that
sequence *will* be there, and *will* contribute to the stat.
So you will want to skip that sequence when you do your calcs
from the simulation. You can do that with the *skipTaxNums*
arg, which is a list of lists. The outer list is nParts long,
and each inner list is a list of taxNums to exclude.
"""
if not useConstantSites:
newData = Data([])
aligs = []
for a in self.alignments:
# aligs.append(a.removeConstantSites())
aligs.append(
a.subsetUsingMask(a.constantMask(), theMaskChar='1', inverse=1))
newData._fill(aligs)
theResult = newData.compoChiSquaredTest(verbose=verbose,
skipColumnZeros=skipColumnZeros,
useConstantSites=1, skipTaxNums=skipTaxNums,
getRows=getRows)
del(newData)
return theResult
gm = ['Data.compoChiSquaredTest()']
nColumnZeros = 0
results = []
# check skipTaxNums
if skipTaxNums != None:
if not isinstance(skipTaxNums, list):
gm.append("skipTaxNums should be a list of lists.")
raise P4Error(gm)
if len(skipTaxNums) != self.nParts:
gm.append(
"skipTaxNums should be a list of lists, nParts long.")
raise P4Error(gm)
for s in skipTaxNums:
if not isinstance(s, list):
gm.append("skipTaxNums should be a list of lists.")
raise P4Error(gm)
for i in s:
if not isinstance(i, int):
gm.append(
"skipTaxNums inner list items should be tax numbers.")
gm.append("Got %s" % i)
raise P4Error(gm)
# Check for blank sequences. Its a pain to force the user to do this.
hasBlanks = False
blankSeqNums = []
for partNum in range(self.nParts):
p = self.parts[partNum]
partBlankSeqNums = []
for taxNum in range(self.nTax):
if skipTaxNums and skipTaxNums[partNum] and taxNum in skipTaxNums[partNum]:
pass
else:
nSites = pf.partSequenceSitesCount(
p.cPart, taxNum) # no gaps, no missings
if not nSites:
partBlankSeqNums.append(taxNum)
if partBlankSeqNums:
hasBlanks = True
blankSeqNums.append(partBlankSeqNums)
if hasBlanks:
gm.append(
"These sequence numbers were found to be blank. They should be excluded.")
gm.append("%s" % blankSeqNums)
gm.append("Set the arg skipTaxNums to this list.")
raise P4Error(gm)
for partNum in range(self.nParts):
gm = ['Data.compoChiSquaredTest() Part %i' % partNum]
p = self.parts[partNum]
comps = []
for taxNum in range(self.nTax):
if skipTaxNums and skipTaxNums[partNum] and taxNum in skipTaxNums[partNum]:
pass
else:
oneComp = p.composition([taxNum])
nSites = pf.partSequenceSitesCount(
p.cPart, taxNum) # no gaps, no missings
# print "tax %i, nSites=%i, oneComp=%s" % (taxNum, nSites,
# oneComp)
if nSites:
for k in range(len(oneComp)):
oneComp[k] = oneComp[k] * nSites
comps.append(oneComp)
else:
gm.append(
"(Zero-based) sequence %i is blank, and should be excluded." % taxNum)
gm.append(
"You need to add the number %i to the arg skipTaxNums list of lists." % taxNum)
gm.append(
"(I could do that automatically, but it is best if *you* do it, explicitly.)")
gm.append(
"You can use the Alignment method checkForBlankSequences(listSeqNumsOfBlanks=True)")
gm.append("to help you get those inner lists.")
raise P4Error(gm)
# print "comps=", comps
# Here we calculate the X^2 stat. But we want to check
# for columns summing to zero. So we can't use
# p4.func.xSquared()
nRows = len(comps)
nCols = len(comps[0])
# I could have just kept nSites, above
theSumOfRows = p4.func._sumOfRows(comps)
theSumOfCols = p4.func._sumOfColumns(comps)
# print theSumOfCols
isOk = 1
columnZeros = []
for j in range(len(theSumOfRows)):
if theSumOfRows[j] == 0.0:
gm.append("Zero in a row sum. Programming error.")
raise P4Error(gm)
for j in range(len(theSumOfCols)):
if theSumOfCols[j] == 0.0:
if skipColumnZeros:
columnZeros.append(j)
else:
if verbose:
print(gm[0])
print(" Zero in a column sum.")
print(" And skipColumnZeros is not set, so I am refusing to do it at all.")
isOk = 0
nColumnZeros += 1
theExpected = p4.func._expected(theSumOfRows, theSumOfCols)
# print "theExpected = ", theExpected
# print "columnZeros = ", columnZeros
if isOk:
if getRows:
xSq_rows = []
xSq = 0.0
alreadyGivenZeroWarning = 0
k = 0
for taxNum in range(self.nTax):
if skipTaxNums and skipTaxNums[partNum] and taxNum in skipTaxNums[partNum]:
if getRows:
# this taxon is not in comps. Add a placeholder
xSq_rows.append(0.0)
# k is the counter for comps and theExpected, taxNum
# without the skips
else:
xSq_row = 0.0
for j in range(nCols):
if j in columnZeros:
if skipColumnZeros:
if verbose and not alreadyGivenZeroWarning:
print(gm[0])
print(" Skipping (zero-based) column number(s) %s, which sum to zero." % columnZeros)
alreadyGivenZeroWarning = 1
else:
gm.append("Programming error.")
raise P4Error(gm)
else:
theDiff = comps[k][j] - theExpected[k][j]
xSq_row += (theDiff * theDiff) / \
theExpected[k][j]
xSq += xSq_row
if getRows:
xSq_rows.append(xSq_row)
k += 1
# print xSq_rows
dof = (p.dim - len(columnZeros) - 1) * (len(comps) - 1)
prob = pf.chiSquaredProb(xSq, dof)
if verbose:
print("Part %i: Chi-square = %f, (dof=%i) P = %f" % (partNum, xSq, dof, prob))
if getRows:
# print " rows = %s" % xSq_rows
print("%20s %7s %s" % ('taxName', 'xSq_row', 'P (like puzzle)'))
for tNum in range(self.nTax):
if not skipTaxNums or tNum not in skipTaxNums[partNum]:
thisProb = pf.chiSquaredProb(
xSq_rows[tNum], self.parts[partNum].dim - 1)
print("%20s %7.5f %7.5f" % (self.taxNames[tNum], xSq_rows[tNum], thisProb))
else:
print("%20s --- ---" % self.taxNames[tNum])
if getRows:
results.append([xSq, dof, prob, xSq_rows])
else:
results.append([xSq, dof, prob])
else: # ie not isOk, ie there is a zero in a column sum
# Maybe a bad idea. Maybe it should just die, above.
results.append(None)
if nColumnZeros and verbose:
print("There were %i column zeros." % nColumnZeros)
return results
[docs] def simpleBigXSquared(self):
"""No frills calculation of bigXSquared.
As in :meth:`Data.Data.compoChiSquaredTest`, but with no
options, and hopefully faster. It can't handle gaps or
ambiguities. It should be ok for simulations. It returns a
list of bigXSquared numbers, one for each data partition.
If a character happens to not be there, then a column will be
zero, and so it can't be calculated. In that case -1.0 is
returned for that part.
"""
l = []
for p in self.parts:
l.append(pf.partBigXSquared(p.cPart))
return l
[docs] def simpleConstantSitesCount(self):
"""No frills constant sites count.
It can't handle gaps or ambiguities. It should be ok
for simulations. It returns a list of constant sites counts,
one for each data partition.
For each part, of the sites that are not all gaps+ambigs, if
the sites that are not gaps or ambigs are all the same, then
it is considered here to be a constant site.
"""
l = []
for p in self.parts:
l.append(pf.partSimpleConstantSitesCount(p.cPart))
return l
[docs] def dupe(self):
"""Copy, making new cParts."""
import copy
aligListCopy = copy.deepcopy(self.alignments)
for alig in aligListCopy:
# We do not want the cPart's, but neither do we want to free the
# originals.
for p in alig.parts:
p.cPart = None
del(alig.parts)
alig.parts = []
return Data(aligListCopy)
[docs] def bootstrap(self):
"""Returns a new data object, filled with bootstrapped data.
It is a non-parametric bootstrap. Data partitions are handled
properly, that is if your data has a charpartition, the
bootstrap has the same charpartition, and sites are sampled
only from the appropriate charpartition subset.
Generation of random numbers uses the GSL random number
generator. The state is held in ``var.gsl_rng``, which is
None by default. If you do a bootstrap using this method, it
will use ``var.gsl_rng`` if it exists, or make it if it does
not exist yet. When it makes it, it seeds the state based on
the current time. That should give you lots of variation.
If on the other hand you want to make a series of bootstraps
that are the same as a previous series you can reseed the
randomizer with the same seed before you do it, like this::
if not var.gsl_rng:
var.gsl_rng = pf.gsl_rng_get()
# unusually, set the seed
mySeed = 23 # your chosen int seed
pf.gsl_rng_set(var.gsl_rng, mySeed)
"""
gm = ['Data.bootstrap()']
import copy
aligListCopy = copy.deepcopy(self.alignments)
for alig in aligListCopy:
# We do not want the cPart's, but neither do we want to free the
# originals.
for p in alig.parts:
p.cPart = None
del(alig.parts)
alig.parts = []
d = Data([])
d._fill(aligListCopy)
if not self.cData:
self._setCStuff()
d._setCStuff()
if 0:
print("\nSELF\n====")
self.dump()
print("\n\nNEW DATA\n========")
d.dump()
raise P4Error
if not var.gsl_rng:
var.gsl_rng = pf.gsl_rng_get()
pf.gsl_rng_set(var.gsl_rng, int(time.time()))
pf.bootstrapData(self.cData, d.cData, var.gsl_rng)
# Data.resetSequencesFromParts() uses
# Alignment.resetSequencesFromParts(), which uses
# partSeq = pf.symbolSequences(self.parts[i].cPart)
# which uses thePart->sequences
d.resetSequencesFromParts()
return d
[docs] def meanNCharsPerSite(self):
"""Mean number of different characters per site
Constant sites are not ignored. Ambiguities and gaps are ignored.
This is implemented in C, allowing multiple parts. It is also
implemented in pure Python in the Alignment class, for single
parts (which also optionally gives you a distribution in
addition to the mean); see
:meth:`Alignment.Alignment.meanNCharsPerSite`.
"""
l = []
for p in self.parts:
l.append(pf.partMeanNCharsPerSite(p.cPart))
return l