10   Input and Output

10.1   Exercises

10.1.1   Exercise

Write a function which take the path of file as parameter and display it’s content on the screen.

We wait same behavior as the shell cat command.

import sys
import os

def cat(path):
   if not os.path.exists(path):
      sys.exit("no such file: {0}".format(path)
   with open(path, 'r') as infile:
      for line in infile:
         print(line)

10.1.2   Exercise

Write a function which take the path of a file in rebase format and return in a dictionary the collection of the enzyme contains in the file. The sequence of the binding site must be cleaned up.

use the file rebase_light.txt to test your code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

def rebase_parser(rebase_file):
    """
    :param rebase_file: the rebase file to parse
    :type rebase_file: file object
    :return: at each call return a tuple (str enz name, str binding site)
    :rtype: iterator
    """
    def clean_seq(seq):
        """
        remove each characters which are not a base
        """
        clean_seq = ''
        for char in seq:
            if char in 'ACGT':
                clean_seq += char
        return clean_seq
    
    for line in rebase_file:
        fields = line.split()
        #fields = fields.split()
        name = fields[0]
        seq = clean_seq(fields[2])
        yield (name, seq)
     
        
if __name__ == '__main__':
    import sys
    import os.path
    
    if len(sys.argv) != 2:
        sys.exit("usage multiple_fasta fasta_path")
    rebase_path = sys.argv[1]
    if not os.path.exists(rebase_path):
        sys.exit("No such file: {}".format(rebase_path))
        
    with open(rebase_path, 'r') as rebase_input:   
        for enz in rebase_parser(rebase_input):
            print(enz)

rebase.py .

10.1.3   Exercise

write a function which take the path of a fasta file (containing only one sequence) and return a data structure of your choice that allow to stock the id of the sequence and the sequence itself.

use the file seq.fasta to test your code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from collections import namedtuple 

Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_path):
    """
    :param fasta_path: the path to the file to parse
    :type fasta_path: string
    :return: a sequence
    :rtype: Sequence instance
    """
    with open(fasta_path, 'r') as fasta_infile:
        id_ = ''
        comment = ''
        sequence = ''
        for line in fasta_infile:
            if line.startswith('>'):
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                sequence += line.strip()
    return Sequence(id_ , comment, sequence)

fasta_reader.py .

10.1.4   Exercise

Modify the code at the previous exercise to read multiple sequences fasta file. use the file abcd.fasta to test your code.

10.1.4.1   solution 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from collections import namedtuple 

Sequence = namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_path):
    """
    :param fasta_path: the path to the file to parse
    :type fasta_path: string
    :return: the list of sequences read from the file
    :rtype: list of Sequence 
    """
    sequences = []
    with open(fasta_path, 'r') as fasta_infile:
        id_ = ''
        comment = ''
        sequence = ''
        for line in fasta_infile:
            if line.startswith('>'):
                # a new sequence begin
                if id_ != '':
                    # a sequence was already parsed so add it to the list
                    sequences.append(Sequence(id_, comment, sequence))
                    sequence = ''
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                sequence += line.strip()
        sequences.append(Sequence(id_, comment, sequence))
    return sequences


# The problem with this implementation is that we have to load all 
# sequences in memory before to start to work with
# it is better to return sequence one by one
# and treat them as they are loaded.

multiple_fasta_reader.py

10.1.4.2   solution 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from collections import namedtuple 

Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_file):
    """
    :param fasta_file: to the file in fasta format to parse
    :type fasta_file: file object
    :return: a sequence until they are sequences in the file
    :rtype: a Sequence or None
    """
    id_ = ''
    comment = ''
    sequence = ''
    # As we use seek or tell, we cannot use for line in file object
    # Because in the last case tell is always at the end of file
    # even if when we read the first line
    # So I use readline
    line = fasta_file.readline()
    while line:
        if line.startswith('>'):
            # a new sequence begin
            if id_ == '':
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                # I already parse a sequence
                # So the beginning of this sequence indicate the end of the
                # previous sequence
                # put the cursor one line in back for the next fasta_reader call
                fasta_file.seek(-len(line),1)
                # I return the previous sequence
                return Sequence(id_ , comment, sequence)
        else:
            sequence += line.strip()
        line = fasta_file.readline()
    if id_ == '' and sequence == '':
        return 
    else:
        return Sequence(id_ , comment, sequence)


# to return sequence by sequence we had to open the file outside the fasta_reader
# at each fasta_reader call the function return one sequence
# unitl the end of file

if __name__ == '__main__':
    import sys
    import os.path
    
    if len(sys.argv) != 2:
        sys.exit("usage multiple_fasta fasta_path")
    fasta_path = sys.argv[1]
    if not os.path.exists(fasta_path):
        sys.exit("No such file: {}".format(fasta_path))
        
    with open(fasta_path, 'r') as fasta_input:    
        sequence = True
        while sequence is not None:
            sequence =  fasta_reader(fasta_input)
            print("----------------")
            print(sequence)
            

multiple_fasta_reader2.py

10.1.4.3   solution 3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from collections import namedtuple 
from itertools import groupby
   
Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_iter(fasta_file):
   """
   :param fasta_file: the file containing all input sequences in fasta format.
   :type fasta_file: file object
   :author: http://biostar.stackexchange.com/users/36/brentp
   :return: for a given fasta file, it returns an iterator which yields tuples
          (string id, string comment, int sequence length)
   :rtype: iterator
   """
   with open(fasta_path) as fasta_file:
      # ditch the boolean (x[0]) and just keep the header or sequence since
      # we know they alternate.
      group = (x[1] for x in groupby(fasta_file , lambda line: line.startswith(">")))
      for header in group:
         print(header)
         # drop the ">"
         header = header.next()[1:].strip()
         header = header.split()
         _id = header[0]
         comment = ' '.join(header[1:])
         seq = ''.join(s.strip() for s in group.next())
         yield Sequence(_id, comment, seq)
         
#using exanple:
#f = fasta_iter('seq.fasta')
#f.next()
#or
# for seq in fasta_iter('seq.fasta'):
#   do something with seq
#The problem with this implementation is
# something goes wrong in do something with seq
# but we don't quit the program (we catch the exception for instance)
# the fasta file is still open
# it's better to put the fasta file opening out the fasta reader see fasta filter 

if __name__ == '__main__':
    import sys
    import os.path
    
    if len(sys.argv) != 2:
        sys.exit("usage multiple_fasta fasta_path")
    fasta_path = sys.argv[1]
    if not os.path.exists(fasta_path):
        sys.exit("No such file: {}".format(fasta_path))
        
    with open(fasta_path, 'r') as fasta_input:    
        for sequence in fasta_iter(fasta_input):
            print("----------------")
            print(sequence)
            

fasta_iterator.py .

With the first version, we have to load all sequences before to treat them. if the file is huge (>G0) it can be a problem.

The third version allow to red sequences one by one. To do that we have to open the file outside the reader function The fasta format is very convenient for human but not for parser. The end of a sequence is indicated by the end of file or the begining of a new one. So with this version we have play with the cursor to place the cursor backward when we encouter a new sequence. then the cursor is placed at the right place for the next sequence.

The third version is an iterator and use generator. generators are functions which keep a state between to calls. generators does not use return to return a value but the keyword yield. Thus this implementation retrun sequence by sequence without to play with the cursor. You can call this function and put in in a loop or call next. Work with the sequence and pass to the next sequence on so on. for instance which is a very convenient way to use it:

for seq in fasta_iter('my_fast_file.fasta'):
   print(seq)

10.1.5   Exercise

Read a multiple sequence file in fasta format and write to a new file, one sequence by file, only sequences starting with methionine and containing at least six tryptophanes (W).

(you should create files for sequences: ABCD1_HUMAN, ABCD1_MOUSE, ABCD2_HUMAN, ABCD2_MOUSE, ABCD2_RAT, ABCD4_HUMAN, ABCD4_MOUSE)

10.1.5.1   bonus

Write sequences with 80 aa/line

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys
import os
from collections import namedtuple 
from itertools import groupby
   
Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_iter(fasta_file):
    """
    :param fasta_file: the file containing all input sequences in fasta format.
    :type fasta_file: file object
    :author: http://biostar.stackexchange.com/users/36/brentp
    :return: for a given fasta file, it returns an iterator which yields tuples
          (string id, string comment, int sequence length)
    :rtype: iterator
    """
    # ditch the boolean (x[0]) and just keep the header or sequence since
    # we know they alternate.
    group = (x[1] for x in groupby(fasta_file , lambda line: line[0] == ">"))
    for header in group:
       # drop the ">"
       header = header.next()[1:].strip()
       header = header.split()
       _id = header[0]
       comment = ' '.join(header[1:])
       seq = ''.join(s.strip() for s in group.next())
       yield Sequence(_id, comment, seq)
         
         
def fasta_writer(sequence, output_file):
    """
    write a sequence in a file in fasta format

    :param sequence: the sequence to print
    :type sequence: Sequence instance
    :param v: the file to print the sequence in
    :type output_file: file object
    """
    output_file.write('>{0.id} {0.comment}\n'.format(seq))
    start = 0
    while start < len(seq.sequence):
        end = start + 80
        print(start, " : ", end)
        output_file.write(seq.sequence[start: end + 1] + '\n')
        start = end


if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.exit("usage: fasta_filter path/to/fasta/file/to/read")
    input_path = sys.argv[1]
    
    with open(input_path, 'r') as input_file:
        for seq in fasta_iter(input_path):
            if seq.sequence.startswith('M') and seq.sequence.count('W') > 6:
                if os.path.exists(seq.id):
                    print("file {0} already exist: sequence {0} skipped".format(seq.id), file=sys.stderr)
                    continue
                else:
                    output_fasta = seq.id + ".fa"
                    with open(output_path, 'w') as output_file:
                        fasta_writer(seq, output_file)

fasta_iterator.py .

10.1.6   Exercise

we ran a blast with the folowing command blastall -p blastp -d uniprot_sprot -i query_seq.fasta -e 1e-05 -m 8 -o blast2.txt

-m 8 is the tabular output. So each fields is separate to the following by a ‘t’

The fields are: query id, database sequence (subject) id, percent identity, alignment length, number of mismatches, number of gap openings, query start, query end, subject start, subject end, Expect value, HSP bit score.

blast2.txt .

parse the file
sort the hits by their percent identity in the descending order.
write the results in a new file.

(adapted from managing your biological data with python p138)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from operator import itemgetter
from collections import namedtuple

Hit = namedtuple("Hit" ,"id percent identity align_len mis_num, open_gap_num\
                  query_start, query_end, subject_start, subject_end, E_value, HSP_bit_score")

def parse_blast_output(input_file):
    """
    :param input_file: the path of the blast report (in m8 format) to parse
    :type input_file: string
    :return: list of hits
    :rtype: list of Hit
    """
    with open(input_file, 'r') as infile:
        table = []
        for line in infile:
            col = line.split('\t')
            try:
                col[2] = float(col[2])
            except ValueError as err:
                raise RuntimeError("error in parsing {} : {}".format(input_file, err))
            col[-1] = col[-1][:-1]
            table.append(col)
    return table


def write_blast_output(hits, output_file):
    """
    Write hits in file in format text
    one hit per line
    
    :param hits: hit to wite in file
    :type hits: namedtuple Hit
    :param output_file: the path of the file to write hits in
    :type output_file: string
    """
    with open(output_file, 'w') as output:
        for row in table_sorted:
            row = [str(x) for x in row]
            output.write("\t".join(row) + "\n")

   
if __name__ == '__main__':
    table_hits = parse_blast_output('blast2.txt')
    #table_hits = parse_blast_output('blast.txt')
    table_sorted = sorted(table_hits, key=itemgetter(2), reverse=True)
    # alternative
    # table_sorted = sorted(table, key = lambda x : x[2], reversed = True)
    write_blast_output(table_hits, 'blast_sorted.txt')  
   
         

parse_blast.py .