/usr/share/pyshared/Bio/SeqIO/AbiIO.py is in python-biopython 1.58-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com)
# Revisions copyright 2011 by Peter Cock.
# This code is part of the Biopython distribution and governed by its
# license. Please see the LICENSE file that should have been included
# as part of this package.
"""Bio.SeqIO parser for the ABI format.
ABI is the format used by Applied Biosystem's sequencing machines to store
sequencing results.
For more details on the format specification, visit:
http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf
"""
__docformat__ = "epytext en"
import datetime
import struct
from os.path import basename
from sys import version_info
from Bio import Alphabet
from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio._py3k import _bytes_to_string, _as_bytes
# dictionary for determining which tags goes into SeqRecord annotation
# each key is tag_name + tag_number
# if a tag entry needs to be added, just add its key and its key
# for the annotations dictionary as the value
_EXTRACT = {
'TUBE1': 'sample_well',
'DySN1': 'dye',
'GTyp1': 'polymer',
'MODL1': 'machine_model',
}
# dictionary for tags that require preprocessing before use in creating
# seqrecords
_SPCTAGS = [
'PBAS2', # base-called sequence
'PCON2', # quality values of base-called sequence
'SMPL1', # sample id inputted before sequencing run
'RUND1', # run start date
'RUND2', # run finish date
'RUNT1', # run start time
'RUNT2', # run finish time
]
# dictionary for data unpacking format
_BYTEFMT = {
1: 'b', # byte
2: 's', # char
3: 'H', # word
4: 'h', # short
5: 'i', # long
6: '2i', # rational, legacy unsupported
7: 'f', # float
8: 'd', # double
10: 'h2B', # date
11: '4B', # time
12: '2i2b', # thumb
13: 'B', # bool
14: '2h', # point, legacy unsupported
15: '4h', # rect, legacy unsupported
16: '2i', # vPoint, legacy unsupported
17: '4i', # vRect, legacy unsupported
18: 's', # pString
19: 's', # cString
20: '2i', # tag, legacy unsupported
}
# header data structure (exluding 4 byte ABIF marker)
_HEADFMT = '>H4sI2H3I'
# directory data structure
_DIRFMT = '>4sI2H4I'
def AbiIterator(handle, alphabet=None, trim=False):
"""Iterator for the Abi file format.
"""
# raise exception is alphabet is not dna
if alphabet is not None:
if isinstance(Alphabet._get_base_alphabet(alphabet),
Alphabet.ProteinAlphabet):
raise ValueError("Invalid alphabet, ABI files do not hold proteins.")
if isinstance(Alphabet._get_base_alphabet(alphabet),
Alphabet.RNAAlphabet):
raise ValueError("Invalid alphabet, ABI files do not hold RNA.")
# raise exception if handle mode is not 'rb'
if hasattr(handle, 'mode'):
if set('rb') != set(handle.mode.lower()):
raise ValueError("ABI files has to be opened in 'rb' mode.")
# check if input file is a valid Abi file
handle.seek(0)
marker = handle.read(4)
if not marker:
# handle empty file gracefully
raise StopIteration
if marker != _as_bytes('ABIF'):
raise IOError('File should start ABIF, not %r' % marker)
# dirty hack for handling time information
times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', }
# initialize annotations
annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT)))
# parse header and extract data from directories
header = struct.unpack(_HEADFMT, \
handle.read(struct.calcsize(_HEADFMT)))
for tag_name, tag_number, tag_data in _abi_parse_header(header, handle):
# stop iteration if all desired tags have been extracted
# 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3,
# and seq, qual, id
# todo
key = tag_name + str(tag_number)
# PBAS2 is base-called sequence
if key == 'PBAS2':
seq = tag_data
ambigs = 'KYWMRS'
if alphabet is None:
if set(seq).intersection(ambigs):
alphabet = ambiguous_dna
else:
alphabet = unambiguous_dna
# PCON2 is quality values of base-called sequence
elif key == 'PCON2':
qual = [ord(val) for val in tag_data]
# SMPL1 is sample id entered before sequencing run
elif key == 'SMPL1':
sample_id = tag_data
elif key in times:
times[key] = tag_data
else:
# extract sequence annotation as defined in _EXTRACT
if key in _EXTRACT:
annot[_EXTRACT[key]] = tag_data
# set time annotations
annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1'])
annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2'])
# use the file name as SeqRecord.name if available
try:
file_name = basename(handle.name).replace('.ab1','')
except:
file_name = ""
record = SeqRecord(Seq(seq, alphabet),
id=sample_id, name=file_name,
description='',
annotations=annot,
letter_annotations={'phred_quality': qual})
if not trim:
yield record
else:
yield _abi_trim(record)
def _AbiTrimIterator(handle):
"""Iterator for the Abi file format that yields trimmed SeqRecord objects.
"""
return AbiIterator(handle, trim=True)
def _abi_parse_header(header, handle):
"""Generator that returns directory contents.
"""
# header structure (after ABIF marker):
# file version, tag name, tag number,
# element type code, element size, number of elements
# data size, data offset, handle (not file handle)
head_elem_size = header[4]
head_elem_num = header[5]
head_offset = header[7]
index = 0
while index < head_elem_num:
start = head_offset + index * head_elem_size
# add directory offset to tuple
# to handle directories with data size <= 4 bytes
handle.seek(start)
dir_entry = struct.unpack(_DIRFMT, \
handle.read(struct.calcsize(_DIRFMT))) + (start,)
index += 1
# only parse desired dirs
key = _bytes_to_string(dir_entry[0])
key += str(dir_entry[1])
if key in (list(_EXTRACT.keys()) + _SPCTAGS):
tag_name = _bytes_to_string(dir_entry[0])
tag_number = dir_entry[1]
elem_code = dir_entry[2]
elem_num = dir_entry[4]
data_size = dir_entry[5]
data_offset = dir_entry[6]
tag_offset = dir_entry[8]
# if data size <= 4 bytes, data is stored inside tag
# so offset needs to be changed
if data_size <= 4:
data_offset = tag_offset + 20
handle.seek(data_offset)
data = handle.read(data_size)
yield tag_name, tag_number, \
_parse_tag_data(elem_code, elem_num, data)
def _abi_trim(seq_record):
"""Trims the sequence using Richard Mott's modified trimming algorithm.
seq_record - SeqRecord object to be trimmed.
Trimmed bases are determined from their segment score, which is a
cumulative sum of each base's score. Base scores are calculated from
their quality values.
More about the trimming algorithm:
http://www.phrap.org/phredphrap/phred.html
http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html
"""
start = False # flag for starting position of trimmed sequence
segment = 20 # minimum sequence length
trim_start = 0 # init start index
cutoff = 0.05 # default cutoff value for calculating base score
if len(seq_record) <= segment:
return seq_record
else:
# calculate base score
score_list = [cutoff - (10 ** (qual/-10.0)) for qual in
seq_record.letter_annotations['phred_quality']]
# calculate cummulative score
# if cummulative value < 0, set it to 0
# first value is set to 0, because of the assumption that
# the first base will always be trimmed out
cummul_score = [0]
for i in range(1, len(score_list)):
score = cummul_score[-1] + score_list[i]
if score < 0:
cummul_score.append(0)
else:
cummul_score.append(score)
if not start:
# trim_start = value when cummulative score is first > 0
trim_start = i
start = True
# trim_finish = index of highest cummulative score,
# marking the end of sequence segment with highest cummulative score
trim_finish = cummul_score.index(max(cummul_score))
return seq_record[trim_start:trim_finish]
def _parse_tag_data(elem_code, elem_num, raw_data):
"""Returns single data value.
elem_code - What kind of data
elem_num - How many data points
raw_data - abi file object from which the tags would be unpacked
"""
if elem_code in _BYTEFMT:
# because '>1s' unpack differently from '>s'
if elem_num == 1:
num = ''
else:
num = str(elem_num)
fmt = '>' + num + _BYTEFMT[elem_code]
assert len(raw_data) == struct.calcsize(fmt)
data = struct.unpack(fmt, raw_data)
# no need to use tuple if len(data) == 1
# also if data is date / time
if elem_code not in [10, 11] and len(data) == 1:
data = data[0]
# account for different data types
if elem_code == 2:
return _bytes_to_string(data)
elif elem_code == 10:
return str(datetime.date(*data))
elif elem_code == 11:
return str(datetime.time(*data[:3]))
elif elem_code == 13:
return bool(data)
elif elem_code == 18:
return _bytes_to_string(data[1:])
elif self.elem_code == 19:
return _bytes_to_string(data[:-1])
else:
return data
else:
return None
if __name__ == '__main__':
pass
|