import sys
import re
import string
import os
import io
import p4.func
import copy
from p4.var import var
from p4.p4exceptions import P4Error
from subprocess import Popen, PIPE
from builtins import object # For Py2/3 compatibility, needed for redefinition of __bool__() below in Py2
from p4.sequence import Sequence
[docs]class SequenceList(object):
"""A container for a list of Sequence objects.
The usual input would be a fasta file::
read('sequences.fas')
sl = var.sequenceLists.pop()
# see what you have
sl.dump()
# look at the sequences
for s in sl.sequences:
print s.name, s.dataType
# Get at sequences by name from a dictionary
sl.makeSequenceForNameDict()
s = sl.sequenceForNameDict['mammoth']
# align them using muscle
a = sl.muscle()
"""
def __init__(self, flob=None):
#: A list of Sequence objects
self.sequences = []
#: If it came from a file with a name, this is it.
self.fName = None
#: Allows you to find Sequence objects from their Sequence.name
self.sequenceForNameDict = None
if flob:
self._readFastaFile(flob)
if hasattr(flob, 'name'):
self.fName = flob.name
[docs] def makeSequenceForNameDict(self):
self.sequenceForNameDict = {}
for s in self.sequences:
assert s.name not in self.sequenceForNameDict, "duped name %s" % s.name
self.sequenceForNameDict[s.name] = s
def _readFastaMakeSeq(self, splHeadLine, sLineList):
gm = ['SequenceList._readFastaMakeSeq()']
if not splHeadLine or not splHeadLine[0]:
gm.append("No name for new fasta sequence. This should not happen.")
raise P4Error(gm)
if not sLineList:
gm.append("No sequence for %s" % splHeadLine)
raise P4Error(gm)
mySeq = Sequence()
mySeq.name = splHeadLine[0]
if len(splHeadLine) == 2:
mySeq.comment = splHeadLine[1]
mySeq.sequence = ''.join(sLineList).lower()
return mySeq
def _readFastaReadHeadLine(self, aLine):
gm = ['SequenceList._readFastaReadHeadLine(%s)']
assert aLine.startswith(">") # or else we would not be here.
# There should be no space after the ">"
if aLine[1] in string.whitespace:
gm.append("The '>' should not be followed by whitespace.")
raise P4Error(gm)
# In this next line, the comment, if it exists, picks up a newline. Get
# rid of it with strip().
splHeadLine = [myWord.strip() for myWord in aLine[1:].split(None, 1)]
return splHeadLine
def _readFastaFile(self, flob):
flob.seek(0)
gm = ['SequenceList._readFastaFile()']
if hasattr(flob, 'name'):
gm = ['SequenceList._readFastaFile(%s)' % flob.name]
complaintAboutLength = """
Lines should not be longer than 120 characters.
This will be overlooked here, but other programs may gag.
"""
alreadyComplainedAboutLength = False
# The first line might start with a ';'
# Move the position to the first '>'
aLine = flob.readline()
while aLine[0] != '>':
aLine = flob.readline()
if not aLine:
gm.append("Unable to find a line that starts with '>'")
raise P4Error(gm)
sList = []
splHeadLine = self._readFastaReadHeadLine(aLine)
# print(splHeadLine)
# read the rest of the flob
while 1:
aLine = flob.readline()
# print("read aLine: %s" % aLine, end='')
# If we are at the end, stash the last sequence and break.
if not aLine:
if not splHeadLine:
break
else:
if not sList:
gm.append("No sequence for '%s'?" % splHeadLine[0])
raise P4Error(gm)
mySeq = self._readFastaMakeSeq(splHeadLine, sList)
# print("Got seq, %s, %s, %s" % (mySeq, mySeq.name, mySeq.sequence))
self.sequences.append(mySeq)
del(sList)
break
elif aLine[0] == '>':
# Stash the previous sequence
if not sList:
gm.append("No sequence for '%s'?" % splHeadLine[0])
raise P4Error(gm)
mySeq = self._readFastaMakeSeq(splHeadLine, sList)
self.sequences.append(mySeq)
sList = []
splHeadLine = self._readFastaReadHeadLine(aLine)
else:
sList.append(aLine.strip())
# now fix the sequences
for mySeq in self.sequences:
dType = p4.func.isDnaRnaOrProtein(mySeq.sequence)
if dType == 1:
# print "Its dna"
mySeq.dataType = 'dna'
elif dType == 2:
# print "Its rna"
# print "Converting RNA to DNA"
mySeq.sequence = list(mySeq.sequence)
for i in range(len(mySeq.sequence)):
if mySeq.sequence[i] == 'u':
mySeq.sequence[i] = 't'
mySeq.sequence = ''.join(mySeq.sequence)
mySeq.dataType = 'dna'
else:
# print "Its protein"
mySeq.dataType = 'protein'
if 0:
for mySeq in self.sequences:
print('%20s %-30s' % ('name', mySeq.name))
print('%20s %-30s' % ('comment', mySeq.comment))
print('%20s %-30s' % ('sequence', mySeq.sequence))
print('%20s %-30s' % ('dataType', mySeq.dataType))
print('')
# check for invalid chars
if len(self.sequences) > 0:
bads = 0
if self.sequences[0].dataType == 'dna':
for s in self.sequences:
j = 0
while j < len(s.sequence):
if s.sequence[j] not in var.validDnaChars:
print("bad character '%s' in (zero-based) dna sequence %s " % \
(s.sequence[j], self.sequences.index(s)))
print(" sequence name: %s" % s.name)
print(" at (zero-based) position %s" % j)
bads = bads + 1
if bads > 10:
print("...and possibly others")
break
j = j + 1
if bads > 10:
break
if bads:
gm.append("Got bad characters. See above.")
raise P4Error(gm)
if self.sequences[0].dataType == 'protein':
for s in self.sequences:
j = 0
while j < len(s.sequence):
if s.sequence[j] not in var.validProteinChars:
print("bad character '%s' in (zero-based) protein sequence %s " % \
(s.sequence[j], self.sequences.index(s)))
print(" sequence name: %s" % s.name)
print(" at (zero-based) position %s" % j)
bads = bads + 1
if bads > 10:
print("...and possibly others")
break
j = j + 1
if bads > 10:
break
if bads:
gm.append("Got bad characters. See above.")
raise P4Error(gm)
flob.close()
def _readOpenPirFile(self, flob):
# http://www.ebi.ac.uk/help/formats.html
# NBRF/PIR Format:
# * A sequence in PIR format consists of:
# 1. One line starting with
# 1. a ">" (greater-than) sign, followed by
# 2. a two-letter code describing the sequence type (P1, F1, DL, DC, RL, RC, or XX), followed by
# 3. a semicolon, followed by
# 4. the sequence identification code (the database ID-code).
# 2. One line containing a textual description of the sequence.
# 3. One or more lines containing the sequence itself. The end of the sequence is marked
# by a "*" (asterisk) character.
# * A file in PIR format may comprise more than one sequence.
# Sequence type Code
# Protein (complete) P1
# Protein (fragment) F1
# DNA (linear) DL
# DNA (circular) DC
# RNA (linear) RL
# RNA (circular) RC
# tRNA N3
# other functional RNA N1
flob.seek(0)
gm = ['SequenceList._readOpenPirFile()']
if hasattr(flob, 'name'):
gm = ['SequenceList._readOpenPirFile(%s)' % flob.name]
ll = [l.strip() for l in flob.readlines()]
lNum = -1
while 1:
lNum += 1
try:
aLine = ll[lNum]
# print "a %4i aLine: '%s'" % (lNum, aLine)
except IndexError:
break
# print "a1 %4i aLine: '%s'" % (lNum, aLine)
if not aLine or not aLine.startswith('>'):
try:
lNum += 1
aLine = ll[lNum]
# print "b %4i aLine: '%s'" % (lNum, aLine)
except IndexError:
break
# print "c %4i aLine: %s" % (lNum, aLine)
if aLine[3] != ';':
gm.append("First line is: %s" % aLine.rstrip())
gm.append("4th char should be ';'")
raise P4Error(gm)
twoChars = aLine[1:3]
if twoChars not in ['P1']:
gm.append("First line is: %s" % aLine.rstrip())
gm.append(
"Code characters '%s' are not recognized / implemented. Fix me?" % twoChars)
raise P4Error(gm)
seqObj = Sequence()
if twoChars == 'P1':
seqObj.dataType = 'protein'
else:
gm.append(
"Pir datatype code '%s' is not implemented. Fix me." % twoChars)
raise P4Error(gm)
# So I can append lines. I'll change it back to a string later
seqObj.sequence = []
splLine = aLine.split(';')
seqObj.name = splLine[1]
# print "got pir seq name %s" % seqObj.name
# Get the comment line.
lNum += 1
try:
aLine = ll[lNum]
# print "d %4i aLine: %s" % (lNum, aLine)
if aLine:
seqObj.comment = aLine
# print "got comment '%s' for pir seq %s" % (seqObj.comment, seqObj.name)
# else:
# print "No comment line for %s" % seqObj.name
except IndexError:
gm.append(
"premature end to pir file, in sequence %s" % seqObj.name)
raise P4Error(gm)
while 1:
lNum += 1
try:
aLine = ll[lNum]
# print "e %4i aLine: %s" % (lNum, aLine)
except IndexError:
break
if not aLine:
gm.append(
"Misplaced blank line in pir sequence %s" % seqObj.name)
raise P4Error(gm)
if aLine[0] == '>':
break
seqObj.sequence.append(aLine)
if aLine.endswith('*'):
break
seqObj.sequence = ''.join(seqObj.sequence)
assert seqObj.sequence.endswith('*')
seqObj.sequence = seqObj.sequence[:-1]
self.sequences.append(seqObj)
# now fix the sequences
myZaps = string.digits + string.whitespace + '\0'
for seqObj in self.sequences:
if '.' in seqObj.sequence:
gm.append("Dots don't work in a pir file, do they?")
raise P4Error(gm)
seqObj.sequence = seqObj.sequence.lower()
seqObj.sequence = re.sub('['+myZaps+']', '', seqObj.sequence)
if 0:
for seqObj in self.sequences:
print('%20s %-30s' % ('name', seqObj.name))
print('%20s %-30s' % ('comment', seqObj.comment))
print('%20s %-30s' % ('sequence', seqObj.sequence))
print('%20s %-30s' % ('dataType', seqObj.dataType))
print('')
# check for invalid chars
if len(self.sequences) > 0:
bads = 0
if self.sequences[0].dataType == 'dna':
for s in self.sequences:
j = 0
while j < len(s.sequence):
if s.sequence[j] not in var.validDnaChars:
print("bad character '%s' in (zero-based) dna sequence %s " % \
(s.sequence[j], self.sequences.index(s)))
print(" sequence name: %s" % s.name)
print(" at (zero-based) position %s" % j)
bads = bads + 1
if bads > 10:
print("...and possibly others")
break
j = j + 1
if bads > 10:
break
if bads:
gm.append("Got bad characters.")
raise P4Error(gm)
if self.sequences[0].dataType == 'protein':
for s in self.sequences:
j = 0
while j < len(s.sequence):
if s.sequence[j] not in var.validProteinChars:
print("bad character '%s' in (zero-based) protein sequence %s " % \
(s.sequence[j], self.sequences.index(s)))
print(" sequence name: %s" % s.name)
print(" at (zero-based) position %s" % j)
bads = bads + 1
if bads > 10:
print("...and possibly others")
break
j = j + 1
if bads > 10:
break
if bads:
gm.append("Got bad characters.")
raise P4Error(gm)
flob.close()
return self # ie success
[docs] def alignment(self):
"""Make self into an alignment, and return it.
If all the sequences are the same length and type, then self,
a sequenceList, could be an Alignment. This method generates
an Alignment instance, runs the Alignment method
checkLengthsAndTypes(), and returns the Alignment.
If you feed p4 a fasta sequence, it makes SequenceList object,
and runs this method on it. If it works then p4 puts the
Alignment object in var.alignments, and if not it puts the
SequenceList object in var.sequenceLists.
It is possible that p4 might think that some short sequences
are DNA when they are really protein. In that case it will
fail to make an alignment, because it will fail the types
check. So what you can do is something like this::
sl = var.sequenceLists[0]
for s in sl.sequences:
s.dataType = 'protein'
a = sl.alignment()
"""
from p4.alignment import Alignment
a = Alignment()
a.fName = self.fName
import copy
a.sequences = copy.deepcopy(self.sequences) # self will be deleted
a.fName = self.fName
a.checkLengthsAndTypes()
return a
[docs] def writeFasta(self, fName=None, comment=1, width=60, append=0, seqNum=None, writeExtraNewline=True):
"""Write out the sequences in Fasta format.
This will write to stdout by default, or a file name, or to an
open file-like object, eg a StringIO object.
The sequences may have comments, which are written by default.
If you don't want comments, say comment=None
By default, sequences are wrapped when they are too long.
You can set the length at which to wrap the sequences.
Set width=0 if you want your sequences in one (long) line.
If seqNum=None, the default, then all the sequences are
written. But you can also just write one sequence, given by
its number. Write out a bunch to the same file with 'append'.
By default, a blank line will be written after each sequence.
If you prefer your fasta without these extra lines, say
writeExtraNewline=False.
"""
complaintHead = '\nSequenceList.writeFasta()'
isFlob = False
#originalTell = None
if fName == None or fName == sys.stdout:
f = sys.stdout
isFlob = True
elif hasattr(fName, 'write'): # an open, file-like object
f = fName
#originalTell = f.tell()
isFlob = True
else:
assert isinstance(fName, str) # a file name
if append:
if os.path.isfile(fName):
try:
f = open(fName, 'a')
except IOError:
print(complaintHead)
print(" Can't open %s for appending." % fName)
sys.exit()
else:
if 0:
print(complaintHead)
print(" 'append' is requested,")
print(" but '%s' is not a regular file (maybe it doesn't exist?)." \
% fName)
print(" Writing to a new file instead.")
try:
f = open(fName, 'w')
except IOError:
print(complaintHead)
print(" Can't open %s for writing." % fName)
sys.exit()
else:
try:
f = open(fName, 'w')
except IOError:
print(complaintHead)
print(" Can't open %s for writing." % fName)
sys.exit()
if seqNum == None:
for i in range(len(self.sequences)):
s = self.sequences[i]
#print("SequenceList.writeFasta() s.name %s, type(s.name) %s" % (s.name, type(s.name)))
f.write('>%s' % s.name)
if comment and s.comment:
f.write(' %s' % s.comment)
f.write('\n')
left = len(s.sequence)
pos = 0
if width > 0:
while left >= width:
if var.writeFastaUppercase:
f.write('%s\n' % s.sequence[pos: pos + width].upper())
else:
f.write('%s\n' % s.sequence[pos: pos + width])
pos = pos + width
left = left - width
if left > 0:
if var.writeFastaUppercase:
f.write('%s\n' % s.sequence[pos:].upper())
else:
f.write('%s\n' % s.sequence[pos:])
if writeExtraNewline:
f.write('\n')
else:
try:
theInt = int(seqNum)
if theInt < 0 or theInt >= len(self.sequences):
print(complaintHead)
print(" seqNum %i is out of range." % seqNum)
sys.exit()
except ValueError:
print(complaintHead)
print(" seqNum should be an integer.")
sys.exit()
s = self.sequences[theInt]
f.write('>%s' % s.name)
if comment and s.comment:
f.write(' %s' % s.comment)
f.write('\n')
left = len(s.sequence)
pos = 0
if width > 0:
while left >= width:
if var.writeFastaUppercase:
f.write('%s\n' % s.sequence[pos: pos + width].upper())
else:
f.write('%s\n' % s.sequence[pos: pos + width])
pos = pos + width
left = left - width
if left > 0:
if var.writeFastaUppercase:
f.write('%s\n' % s.sequence[pos:].upper())
else:
f.write('%s\n' % s.sequence[pos:])
if writeExtraNewline:
f.write('\n')
if isFlob:
if f != sys.stdout:
f.close()
else:
f.close()
[docs] def checkNamesForDupes(self):
if not var.doCheckForDuplicateSequenceNames:
return
snDict = {}
for s in self.sequences:
ret = snDict.get(s.name)
if ret:
snDict[s.name] += 1
else:
snDict[s.name] = 1
nDupes = 0
for k, v in snDict.items():
if v > 1:
print("Got %2i copies of sequence name %s" % (v, k))
nDupes += 1
if nDupes:
gm = ["SequenceList.checkNamesForDupes()"]
if self.fName:
gm.append("File name %s" % self.fName)
gm.append("Got %i duplicate sequence names." % nDupes)
gm.append("(If you want to turn off checking, set ")
gm.append("var.doCheckForDuplicateSequenceNames to False)")
raise P4Error(gm)
[docs] def dump(self):
if isinstance(self, SequenceList):
print("\nSequenceList dump:")
if self.fName:
print(" File name is %s" % self.fName)
if len(self.sequences) == 1:
print(" There is 1 sequence")
else:
print(" There are %s sequences" % len(self.sequences))
nSeqsToDo = len(self.sequences)
if nSeqsToDo > 12:
nSeqsToDo = 10
for i in range(nSeqsToDo):
if isinstance(self, SequenceList):
print(" %3i %5s %s" % (i, len(self.sequences[i].sequence), self.sequences[i].name))
else: # Alignment, don't print sequence lengths
print(" %3i %s" % (i, self.sequences[i].name))
# self.sequences[i].dump()
if len(self.sequences) > nSeqsToDo:
print(" ... and %i others..." % (len(self.sequences) - nSeqsToDo))
print('')
[docs] def renameForPhylip(self, dictFName='p4_renameForPhylip_dict.py'):
"""Rename with strict phylip-friendly short boring names.
It saves the old names (together with the new) in a python
dictionary, in a file, by default named
p4_renameForPhylip_dict.py"""
gm = ['SequenceList.renameForPhylip()']
if os.path.exists(dictFName):
gm.append("The dictionary file '%s' already exists." % dictFName)
raise P4Error(gm)
if hasattr(self, 'taxNames'):
originalNames = self.taxNames
else:
originalNames = [s.name for s in self.sequences]
d = {}
for i in range(len(self.sequences)):
s = self.sequences[i]
newName = 's%i' % i
d[newName] = s.name
s.name = newName
f = open(dictFName, 'w')
f.write("p4_renameForPhylip_originalNames = %s\np4_renameForPhylip_dict = %s\n" % (
originalNames, d))
f.close()
[docs] def restoreNamesFromRenameForPhylip(self, dictFName='p4_renameForPhylip_dict.py'):
"""Given the dictionary file, restore proper names.
The dictionary file is by default named p4_renameForPhylip_dict.py"""
gm = ["SequenceLists.restoreNamesFromRenameForPhylip()"]
if os.path.exists(dictFName):
import __main__
exec(open(dictFName).read(), __main__.__dict__, __main__.__dict__)
from __main__ import p4_renameForPhylip_dict
else:
gm.append("The dictionary file '%s' can't be found." % dictFName)
raise P4Error(gm)
for s in self.sequences:
if s.name in p4_renameForPhylip_dict:
s.name = p4_renameForPhylip_dict[s.name]
else:
gm.append("The dictionary does not contain a key for '%s'." % s.name)
raise P4Error(gm)
del(__main__.p4_renameForPhylip_dict)
del(__main__.p4_renameForPhylip_originalNames)
[docs] def muscle(self):
"""Do an alignment with muscle.
Its all done in memory -- no files are written.
An alignment object is returned.
The order of the sequences in the new alignment is made to be
the same as the order in self.
"""
flob = io.BytesIO()
self.writeFastaToBytesFlob(flob)
p = Popen(["muscle"], stdin=PIPE, stdout=PIPE, stderr=PIPE)
ret = p.communicate(input=flob.getvalue())
flob.close()
try:
a = p4.func.readAndPop(ret[0].decode())
except P4Error:
print(ret)
raise P4Error("Something didn't work ...")
a.makeSequenceForNameDict()
newSequenceList = []
for sSelf in self.sequences:
newSequenceList.append(a.sequenceForNameDict[sSelf.name])
a.sequences = newSequenceList
return a
[docs] def clustalo(self):
"""Do an alignment with clustalo.
Its all done in memory -- no files are written.
An alignment object is returned.
The order of the sequences in the new alignment is made to be
the same as the order in self.
"""
flob = io.BytesIO() # gotta be Bytes for subprocess
self.writeFastaToBytesFlob(flob)
p = Popen(["clustalo", "-i", "-"], stdin=PIPE, stdout=PIPE, stderr=PIPE)
ret = p.communicate(input=flob.getvalue())
#ret = p.communicate()
if ret[1]:
print(ret)
raise P4Error("clustalo() Something wrong here ...")
flob.close()
#print(ret) # it is a bytes string
a = p4.func.readAndPop(ret[0].decode())
a.makeSequenceForNameDict()
newSequenceList = []
for sSelf in self.sequences:
newSequenceList.append(a.sequenceForNameDict[sSelf.name])
a.sequences = newSequenceList
return a
[docs] def writeFastaToBytesFlob(self, flob):
"""For subprocesses, eg muscle and clustalo
No comment, no extra new line. Width 60.
"""
assert isinstance(flob, io.BytesIO)
width = 60
for s in self.sequences:
st = '>%s\n' % s.name
flob.write(str.encode(st))
left = len(s.sequence)
pos = 0
while left >= width:
if var.writeFastaUppercase:
st = '%s\n' % s.sequence[pos: pos + width].upper()
flob.write(str.encode(st))
else:
st = '%s\n' % s.sequence[pos: pos + width]
flob.write(str.encode(st))
pos = pos + width
left = left - width
if left > 0:
if var.writeFastaUppercase:
st = '%s\n' % s.sequence[pos:].upper()
flob.write(str.encode(st))
else:
st = '%s\n' % s.sequence[pos:]
flob.write(str.encode(st))