10 Input and Output

10.1 Exercises

10.1.1 Exercise

Write a function that takes the path of file as parameter and displays it’s content on the screen.

We expect the same behavior as the shell cat command.

import sys
import os

def cat(path):
   if not os.path.exists(path):
      # Exit Python with a non-zero value
      # to signify a failure
      sys.exit("no such file: {0}".format(path))
   with open(path, 'r') as infile:
      for line in infile:
         # By default, print adds a "\n" to what it prints
         # lines from a file already end with "\n".
         print(line, end="")

10.1.2 Exercise

Write a function that takes the path of a file in rebase format and returns in a dictionary the collection of the enzyme contained in the file. The sequence of the binding site must be cleaned up.

Use the file rebase_light.txt to test your code.

 1#!/usr/bin/env python3
 2
 3def rebase_parser(rebase_file):
 4    """
 5    :param rebase_file: the rebase file to parse
 6    :type rebase_file: file object
 7    :return: at each call yields a tuple (str enz name, str binding site)
 8    :rtype: iterator
 9    """
10    def clean_seq(seq):
11        """
12        remove each characters which are not a base
13        """
14        clean_seq = ''
15        for char in seq:
16            if char in 'ACGT':
17                clean_seq += char
18        return clean_seq
19
20    for line in rebase_file:
21        fields = line.split()
22        name = fields[0]
23        seq = clean_seq(fields[2])
24        yield (name, seq)
25
26
27def rebase2dict(rebase_path):
28    """
29    :param rebase_path: the path to rebase file to parse
30    :type rebase_path: str
31    :return: a dict with items (str enz name, str binding site)
32    """
33    with open(rebase_path, 'r') as rebase_input:
34        # enz_dict = {}
35        # for (name, seq) in rebase_parser(rebase_input):
36        #     enz_dict[name] = seq
37        enz_dict = dict(rebase_parser(rebase_input))
38    return enz_dict
39
40
41if __name__ == '__main__':
42    import sys
43    import os.path
44
45    if len(sys.argv) != 2:
46        sys.exit("Usage: rebase.py rebase_file")
47    rebase_path = sys.argv[1]
48    if not os.path.exists(rebase_path):
49        sys.exit("No such file: {}".format(rebase_path))
50
51    enz_dict = rebase2dict(rebase_path)
52    print(enz_dict)
53

rebase.py .

10.1.3 Exercise

Write a function that takes the path of a fasta file and returns a data structure of your choice that allows to store the id of the sequence and the sequence itself.

Use the file seq.fasta to test your code.

 1from collections import namedtuple
 2
 3Sequence =  namedtuple("Sequence", ("id", "comment", "sequence"))
 4
 5def fasta_reader(fasta_path):
 6    """
 7    :param fasta_path: the path to the file to parse
 8    :type fasta_path: string
 9    :return: a sequence
10    :rtype: Sequence instance
11    """
12    with open(fasta_path, 'r') as fasta_infile:
13        id_ = ''
14        comment = ''
15        sequence = ''
16        for line in fasta_infile:
17            if line.startswith('>'):
18                header = line.split()
19                id_ = header[0]
20                comment = ' '.join(header[1:])
21            else:
22                sequence += line.strip()
23    return Sequence(id_, comment, sequence)

fasta_reader.py .

10.1.4 Exercise

Read a multiple sequence file in fasta format and write to a new file, one sequence by file, only sequences starting with methionine and containing at least six tryptophanes (W).

(you should create files for sequences: ABCD1_HUMAN, ABCD1_MOUSE, ABCD2_HUMAN, ABCD2_MOUSE, ABCD2_RAT, ABCD4_HUMAN, ABCD4_MOUSE)

10.1.4.1 bonus

Write sequences with 80 aa/line

 1import sys
 2import os
 3from collections import namedtuple 
 4from itertools import groupby
 5   
 6Sequence =  namedtuple("Sequence", "id comment sequence")
 7
 8def fasta_iter(fasta_file):
 9    """
10    :param fasta_file: the file containing all input sequences in fasta format.
11    :type fasta_file: file object
12    :author: http://biostar.stackexchange.com/users/36/brentp
13    :return: for a given fasta file, it returns an iterator which yields tuples
14          (string id, string comment, int sequence length)
15    :rtype: iterator
16    """
17    # ditch the boolean (x[0]) and just keep the header or sequence since
18    # we know they alternate.
19    group = (x[1] for x in groupby(fasta_file , lambda line: line[0] == ">"))
20    for header in group:
21       # drop the ">"
22       header = header.next()[1:].strip()
23       header = header.split()
24       _id = header[0]
25       comment = ' '.join(header[1:])
26       seq = ''.join(s.strip() for s in group.next())
27       yield Sequence(_id, comment, seq)
28         
29         
30def fasta_writer(sequence, output_file):
31    """
32    write a sequence in a file in fasta format
33
34    :param sequence: the sequence to print
35    :type sequence: Sequence instance
36    :param v: the file to print the sequence in
37    :type output_file: file object
38    """
39    output_file.write('>{0.id} {0.comment}\n'.format(seq))
40    start = 0
41    while start < len(seq.sequence):
42        end = start + 80
43        print start, " : ", end
44        output_file.write(seq.sequence[start: end + 1] + '\n')
45        start = end
46
47
48if __name__ == '__main__':
49    if len(sys.argv) != 2:
50        sys.exit("usage: fasta_filter path/to/fasta/file/to/read")
51    input_path = sys.argv[1]
52    
53    with open(input_path, 'r') as input_file:
54        for seq in fasta_iter(input_path):
55            if seq.sequence.startswith('M') and seq.sequence.count('W') > 6:
56                if os.path.exists(seq.id):
57                    print >> sys.stderr , "file {0} already exist: sequence {0} skipped".format(seq.id)
58                    continue
59                else:
60                    output_fasta = seq.id + ".fa"
61                    with open(output_path, 'w') as output_file:
62                        fasta_writer(seq, output_file)

fasta_iterator.py .

10.1.5 Exercise

We ran a blast with the following command blastall -p blastp -d uniprot_sprot -i query_seq.fasta -e 1e-05 -m 8 -o blast2.txt

-m 8 is the tabular output. So each fields is separate to the following by a ‘t’

The fields are: query id, database sequence (subject) id, percent identity, alignment length, number of mismatches, number of gap openings, query start, query end, subject start, subject end, Expect value, HSP bit score.

blast2.txt .

parse the file
sort the hits by their percent identity in the descending order.
write the results in a new file.

(adapted from managing your biological data with python p138)

 1from operator import itemgetter
 2from collections import namedtuple
 3
 4Hit = namedtuple("Hit", ("query", "subject", "identity", "align_len", "mis_num", "open_gap_num",
 5                  "query_start", "query_end", "subject_start", "subject_end", "E_value", "HSP_bit_score"))
 6
 7def parse_blast_output(input_file):
 8    """
 9    :param input_file: the path of the blast report (in m8 format) to parse
10    :type input_file: string
11    :return: list of hits
12    :rtype: list of Hit
13    """
14    with open(input_file, 'r') as infile:
15        table = []
16        for line in infile:
17            query, subject, identity, *stuff, bit_score = line.split('\t')
18            try:
19                identity = float(identity)
20            except ValueError as err:
21                raise RuntimeError("error in parsing {} : {}".format(input_file, err))
22            bit_score = bit_score.strip()
23            table.append([query, subject, identity, *stuff, bit_score])
24    return table
25
26
27def write_blast_output(hits, output_file):
28    """
29    Write hits in file in format text
30    one hit per line
31    
32    :param hits: hit to wite in file
33    :type hits: namedtuple Hit
34    :param output_file: the path of the file to write hits in
35    :type output_file: string
36    """
37    with open(output_file, 'w') as output:
38        for row in table_sorted:
39            row = [str(x) for x in row]
40            output.write("\t".join(row) + "\n")
41
42   
43if __name__ == '__main__':
44    table_hits = parse_blast_output('blast2.txt')
45    table_sorted = sorted(table_hits, key=itemgetter(2), reverse=True)
46    # alternative
47    # table_sorted = sorted(table, key = lambda x : x[2], reversed = True)
48    write_blast_output(table_hits, 'blast_sorted.txt')  
49

parse_blast.py .

10.1.6 Exercise

  • Parse the files exp1.csv and exp2.csv (exp1.csv, exp2.csv) (create a function to parse file and keep only fields: GenAge ID, symbol, name, entrez gene id, uniprot)

  • get the genes which are in the exp1 but not in the exp2 the 2 files are in csv format based on the uniprot identifier.

  • write the result in a file in csv format.

10.1.6.1 Hint:

Use the module csv in python https://docs.python.org/3/library/csv.html#module-csv Use a reader, as follows:

>>> reader = csv.reader(input, quotechar='"')
 1import csv
 2
 3
 4def parse_gene_file(path):
 5    genes = set()
 6    with open(path, 'r') as input:
 7        reader = csv.reader(input, quotechar='"')
 8        for row in reader:
 9            id_, symbol, _, name, entrez, uniprot, *_ = row
10            genes.add((symbol, name, entrez, uniprot))
11    return genes
12
13
14if __name__ == '__main__':
15    exp1 = parse_gene_file('exp1.csv')
16    exp2 = parse_gene_file('exp2.csv')
17
18    exp1_symbol = {item[-1] for item in exp1}
19    exp2_symbol = {item[-1] for item in exp2}
20
21    spe = exp1_symbol - exp2_symbol
22    with open('exp1_specific.csv', 'w') as out:
23        writer = csv.writer(out)
24        for row in exp1:
25            if row[-1] in spe:
26                writer.writerow(row)

csv.py .

10.1.7 Exercise

Modify the code at the previous exercise to read multiple sequences fasta file. use the file abcd.fasta to test your code.

10.1.7.1 solution 1

 1from collections import namedtuple 
 2
 3Sequence = namedtuple("Sequence", "id comment sequence")
 4
 5def fasta_reader(fasta_path):
 6    """
 7    :param fasta_path: the path to the file to parse
 8    :type fasta_path: string
 9    :return: the list of sequences read from the file
10    :rtype: list of Sequence 
11    """
12    sequences = []
13    with open(fasta_path, 'r') as fasta_infile:
14        id_ = ''
15        comment = ''
16        sequence = ''
17        for line in fasta_infile:
18            if line.startswith('>'):
19                # a new sequence begin
20                if id_ != '':
21                    # a sequence was already parsed so add it to the list
22                    sequences.append(Sequence(id_, comment, sequence))
23                    sequence = ''
24                header = line.split()
25                id_ = header[0]
26                comment = ' '.join(header[1:])
27            else:
28                sequence += line.strip()
29        sequences.append(Sequence(id_, comment, sequence))
30    return sequences
31
32
33# The problem with this implementation is that we have to load all 
34# sequences in memory before to start to work with
35# it is better to return sequence one by one
36# and treat them as they are loaded.

multiple_fasta_reader.py

10.1.7.2 solution 2

 1from collections import namedtuple 
 2
 3Sequence =  namedtuple("Sequence", "id comment sequence")
 4
 5def fasta_reader(fasta_file):
 6    """
 7    :param fasta_file: to the file in fasta format to parse
 8    :type fasta_file: file object
 9    :return: a sequence until they are sequences in the file
10    :rtype: a Sequence or None
11    """
12    id_ = ''
13    comment = ''
14    sequence = ''
15    # As we use seek or tell, we cannot use for line in file object
16    # Because in the last case tell is always at the end of file
17    # even if when we read the first line
18    # So I use readline
19    line = fasta_file.readline()
20    while line:
21        if line.startswith('>'):
22            # a new sequence begin
23            if id_ == '':
24                header = line.split()
25                id_ = header[0]
26                comment = ' '.join(header[1:])
27            else:
28                # I already parse a sequence
29                # So the beginning of this sequence indicate the end of the
30                # previous sequence
31                # put the cursor one line in back for the next fasta_reader call
32                fasta_file.seek(-len(line),1)
33                # I return the previous sequence
34                return Sequence(id_ , comment, sequence)
35        else:
36            sequence += line.strip()
37        line = fasta_file.readline()
38    if id_ == '' and sequence == '':
39        return 
40    else:
41        return Sequence(id_ , comment, sequence)
42
43
44# to return sequence by sequence we had to open the file outside the fasta_reader
45# at each fasta_reader call the function return one sequence
46# unitl the end of file
47
48if __name__ == '__main__':
49    import sys
50    import os.path
51    
52    if len(sys.argv) != 2:
53        sys.exit("usage multiple_fasta fasta_path")
54    fasta_path = sys.argv[1]
55    if not os.path.exists(fasta_path):
56        sys.exit("No such file: {}".format(fasta_path))
57        
58    with open(fasta_path, 'r') as fasta_input:    
59        sequence = True
60        while sequence is not None:
61            sequence =  fasta_reader(fasta_input)
62            print "----------------"
63            print sequence
64            

multiple_fasta_reader2.py

10.1.7.3 solution 3

 1from collections import namedtuple 
 2from itertools import groupby
 3   
 4Sequence =  namedtuple("Sequence", "id comment sequence")
 5
 6def fasta_iter(fasta_file):
 7   """
 8   :param fasta_file: the file containing all input sequences in fasta format.
 9   :type fasta_file: file object
10   :author: http://biostar.stackexchange.com/users/36/brentp
11   :return: for a given fasta file, it returns an iterator which yields tuples
12          (string id, string comment, int sequence length)
13   :rtype: iterator
14   """
15   with open(fasta_path) as fasta_file:
16      # ditch the boolean (x[0]) and just keep the header or sequence since
17      # we know they alternate.
18      group = (x[1] for x in groupby(fasta_file , lambda line: line.startswith(">")))
19      for header in group:
20         # drop the ">"
21         header = header.next()[1:].strip()
22         header = header.split()
23         _id = header[0]
24         comment = ' '.join(header[1:])
25         seq = ''.join(s.strip() for s in group.next())
26         yield Sequence(_id, comment, seq)
27         
28#using exanple:
29#f = fasta_iter('seq.fasta')
30#f.next()
31#or
32# for seq in fasta_iter('seq.fasta'):
33#   do something with seq
34#The problem with this implementation is
35# something goes wrong in do something with seq
36# but we don't quit the program (we catch the exception for instance)
37# the fasta file is still open
38# it's better to put the fasta file opening out the fasta reader see fasta filter 
39
40if __name__ == '__main__':
41    import sys
42    import os.path
43    
44    if len(sys.argv) != 2:
45        sys.exit("usage multiple_fasta fasta_path")
46    fasta_path = sys.argv[1]
47    if not os.path.exists(fasta_path):
48        sys.exit("No such file: {}".format(fasta_path))
49        
50    with open(fasta_path, 'r') as fasta_input:    
51        for sequence in fasta_iter(fasta_input):
52            print "----------------"
53            print sequence
54            

fasta_iterator.py .

With the first version, we have to load all sequences before to treat them. if the file is huge (>G0) it can be a problem.

The third version allow to red sequences one by one. To do that we have to open the file outside the reader function The fasta format is very convenient for human but not for parser. The end of a sequence is indicated by the end of file or the beginning of a new one. So with this version we have play with the cursor to place the cursor backward when we encounter a new sequence. Then the cursor is placed at the right place for the next sequence.

The third version is an iterator and use generator. Generators are functions which keep a state between to calls. Generators do not use return to return a value but the keyword yield. Thus this implementation return sequence by sequence without to play with the cursor. You can call this function and put in in a loop or call next. Work with the sequence and pass to the next sequence on so on. For instance which is a very convenient way to use it:

for seq in fasta_iter('my_fast_file.fasta'):
   print seq