10   Input and Output

10.1   Exercises

10.1.1   Exercise

Write a function that takes the path of file as parameter and displays it’s content on the screen.

We expect the same behavior as the shell cat command.

import sys
import os

def cat(path):
   if not os.path.exists(path):
      # Exit Python with a non-zero value
      # to signify a failure
      sys.exit("no such file: {0}".format(path))
   with open(path, 'r') as infile:
      for line in infile:
         # By default, print adds a "\n" to what it prints
         # lines from a file already end with "\n".
         print(line, end="")

10.1.2   Exercise

Write a function that takes the path of a file in rebase format and returns in a dictionary the collection of the enzyme contained in the file. The sequence of the binding site must be cleaned up.

Use the file rebase_light.txt to test your code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python3

def rebase_parser(rebase_file):
    """
    :param rebase_file: the rebase file to parse
    :type rebase_file: file object
    :return: at each call yields a tuple (str enz name, str binding site)
    :rtype: iterator
    """
    def clean_seq(seq):
        """
        remove each characters which are not a base
        """
        clean_seq = ''
        for char in seq:
            if char in 'ACGT':
                clean_seq += char
        return clean_seq

    for line in rebase_file:
        fields = line.split()
        name = fields[0]
        seq = clean_seq(fields[2])
        yield (name, seq)


def rebase2dict(rebase_path):
    """
    :param rebase_path: the path to rebase file to parse
    :type rebase_path: str
    :return: a dict with items (str enz name, str binding site)
    """
    with open(rebase_path, 'r') as rebase_input:
        # enz_dict = {}
        # for (name, seq) in rebase_parser(rebase_input):
        #     enz_dict[name] = seq
        enz_dict = dict(rebase_parser(rebase_input))
    return enz_dict


if __name__ == '__main__':
    import sys
    import os.path

    if len(sys.argv) != 2:
        sys.exit("Usage: rebase.py rebase_file")
    rebase_path = sys.argv[1]
    if not os.path.exists(rebase_path):
        sys.exit("No such file: {}".format(rebase_path))

    enz_dict = rebase2dict(rebase_path)
    print(enz_dict)

rebase.py .

10.1.3   Exercise

Write a function that takes the path of a fasta file and returns a data structure of your choice that allows to store the id of the sequence and the sequence itself.

Use the file seq.fasta to test your code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from collections import namedtuple 

Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_path):
    """
    :param fasta_path: the path to the file to parse
    :type fasta_path: string
    :return: a sequence
    :rtype: Sequence instance
    """
    with open(fasta_path, 'r') as fasta_infile:
        id_ = ''
        comment = ''
        sequence = ''
        for line in fasta_infile:
            if line.startswith('>'):
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                sequence += line.strip()
    return Sequence(id_ , comment, sequence)

fasta_reader.py .

10.1.4   Exercise

Read a multiple sequence file in fasta format and write to a new file, one sequence by file, only sequences starting with methionine and containing at least six tryptophanes (W).

(you should create files for sequences: ABCD1_HUMAN, ABCD1_MOUSE, ABCD2_HUMAN, ABCD2_MOUSE, ABCD2_RAT, ABCD4_HUMAN, ABCD4_MOUSE)

10.1.4.1   bonus

Write sequences with 80 aa/line

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys
import os
from collections import namedtuple 
from itertools import groupby
   
Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_iter(fasta_file):
    """
    :param fasta_file: the file containing all input sequences in fasta format.
    :type fasta_file: file object
    :author: http://biostar.stackexchange.com/users/36/brentp
    :return: for a given fasta file, it returns an iterator which yields tuples
          (string id, string comment, int sequence length)
    :rtype: iterator
    """
    # ditch the boolean (x[0]) and just keep the header or sequence since
    # we know they alternate.
    group = (x[1] for x in groupby(fasta_file , lambda line: line[0] == ">"))
    for header in group:
       # drop the ">"
       header = header.next()[1:].strip()
       header = header.split()
       _id = header[0]
       comment = ' '.join(header[1:])
       seq = ''.join(s.strip() for s in group.next())
       yield Sequence(_id, comment, seq)
         
         
def fasta_writer(sequence, output_file):
    """
    write a sequence in a file in fasta format

    :param sequence: the sequence to print
    :type sequence: Sequence instance
    :param v: the file to print the sequence in
    :type output_file: file object
    """
    output_file.write('>{0.id} {0.comment}\n'.format(seq))
    start = 0
    while start < len(seq.sequence):
        end = start + 80
        print start, " : ", end
        output_file.write(seq.sequence[start: end + 1] + '\n')
        start = end


if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.exit("usage: fasta_filter path/to/fasta/file/to/read")
    input_path = sys.argv[1]
    
    with open(input_path, 'r') as input_file:
        for seq in fasta_iter(input_path):
            if seq.sequence.startswith('M') and seq.sequence.count('W') > 6:
                if os.path.exists(seq.id):
                    print >> sys.stderr , "file {0} already exist: sequence {0} skipped".format(seq.id)
                    continue
                else:
                    output_fasta = seq.id + ".fa"
                    with open(output_path, 'w') as output_file:
                        fasta_writer(seq, output_file)

fasta_iterator.py .

10.1.5   Exercise

We ran a blast with the following command blastall -p blastp -d uniprot_sprot -i query_seq.fasta -e 1e-05 -m 8 -o blast2.txt

-m 8 is the tabular output. So each fields is separate to the following by a ‘t’

The fields are: query id, database sequence (subject) id, percent identity, alignment length, number of mismatches, number of gap openings, query start, query end, subject start, subject end, Expect value, HSP bit score.

blast2.txt .

parse the file
sort the hits by their percent identity in the descending order.
write the results in a new file.

(adapted from managing your biological data with python p138)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from operator import itemgetter
from collections import namedtuple

Hit = namedtuple("Hit", ("query", "subject", "identity", "align_len", "mis_num", "open_gap_num",
                  "query_start", "query_end", "subject_start", "subject_end", "E_value", "HSP_bit_score"))

def parse_blast_output(input_file):
    """
    :param input_file: the path of the blast report (in m8 format) to parse
    :type input_file: string
    :return: list of hits
    :rtype: list of Hit
    """
    with open(input_file, 'r') as infile:
        table = []
        for line in infile:
            query, subject, identity, *stuff, bit_score = line.split('\t')
            try:
                identity = float(identity)
            except ValueError as err:
                raise RuntimeError("error in parsing {} : {}".format(input_file, err))
            bit_score = bit_score.strip()
            table.append([query, subject, identity, *stuff, bit_score])
    return table


def write_blast_output(hits, output_file):
    """
    Write hits in file in format text
    one hit per line
    
    :param hits: hit to wite in file
    :type hits: namedtuple Hit
    :param output_file: the path of the file to write hits in
    :type output_file: string
    """
    with open(output_file, 'w') as output:
        for row in table_sorted:
            row = [str(x) for x in row]
            output.write("\t".join(row) + "\n")

   
if __name__ == '__main__':
    table_hits = parse_blast_output('blast2.txt')
    table_sorted = sorted(table_hits, key=itemgetter(2), reverse=True)
    # alternative
    # table_sorted = sorted(table, key = lambda x : x[2], reversed = True)
    write_blast_output(table_hits, 'blast_sorted.txt')  

parse_blast.py .

10.1.6   Exercise

  • Parse the files exp1.csv and exp2.csv (exp1.csv, exp2.csv) (create a function to parse file and keep only fields: GenAge ID, symbol, name, entrez gene id, uniprot)
  • get the genes which are in the exp1 but not in the exp2 the 2 files are in csv format based on the uniprot identifier.
  • write the result in a file in csv format.

10.1.6.1   Hint:

Use the module csv in python https://docs.python.org/3/library/csv.html#module-csv Use a reader, as follows:

>>> reader = csv.reader(input, quotechar='"')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import csv


def parse_gene_file(path):
    genes = set()
    with open(path, 'r') as input:
        reader = csv.reader(input, quotechar='"')
        for row in reader:
            id_, symbol, _, name, entrez, uniprot, *_ = row
            genes.add((symbol, name, entrez, uniprot))
    return genes


if __name__ == '__main__':
    exp1 = parse_gene_file('exp1.csv')
    exp2 = parse_gene_file('exp2.csv')

    exp1_symbol = {item[-1] for item in exp1}
    exp2_symbol = {item[-1] for item in exp2}

    spe = exp1_symbol - exp2_symbol
    with open('exp1_specific.csv', 'w') as out:
        writer = csv.writer(out)
        for row in exp1:
            if row[-1] in spe:
                writer.writerow(row)

csv.py .

10.1.7   Exercise

Modify the code at the previous exercise to read multiple sequences fasta file. use the file abcd.fasta to test your code.

10.1.7.1   solution 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from collections import namedtuple 

Sequence = namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_path):
    """
    :param fasta_path: the path to the file to parse
    :type fasta_path: string
    :return: the list of sequences read from the file
    :rtype: list of Sequence 
    """
    sequences = []
    with open(fasta_path, 'r') as fasta_infile:
        id_ = ''
        comment = ''
        sequence = ''
        for line in fasta_infile:
            if line.startswith('>'):
                # a new sequence begin
                if id_ != '':
                    # a sequence was already parsed so add it to the list
                    sequences.append(Sequence(id_, comment, sequence))
                    sequence = ''
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                sequence += line.strip()
                sequences.append(Sequence(id_, comment, sequence))
    return sequences


# The problem with this implementation is that we have to load all 
# sequences in memory before to start to work with
# it is better to return sequence one by one
# and treat them as they are loaded.

multiple_fasta_reader.py

10.1.7.2   solution 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from collections import namedtuple 

Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_reader(fasta_file):
    """
    :param fasta_file: to the file in fasta format to parse
    :type fasta_file: file object
    :return: a sequence until they are sequences in the file
    :rtype: a Sequence or None
    """
    id_ = ''
    comment = ''
    sequence = ''
    # As we use seek or tell, we cannot use for line in file object
    # Because in the last case tell is always at the end of file
    # even if when we read the first line
    # So I use readline
    line = fasta_file.readline()
    while line:
        if line.startswith('>'):
            # a new sequence begin
            if id_ == '':
                header = line.split()
                id_ = header[0]
                comment = ' '.join(header[1:])
            else:
                # I already parse a sequence
                # So the beginning of this sequence indicate the end of the
                # previous sequence
                # put the cursor one line in back for the next fasta_reader call
                fasta_file.seek(-len(line),1)
                # I return the previous sequence
                return Sequence(id_ , comment, sequence)
        else:
            sequence += line.strip()
        line = fasta_file.readline()
    if id_ == '' and sequence == '':
        return 
    else:
        return Sequence(id_ , comment, sequence)


# to return sequence by sequence we had to open the file outside the fasta_reader
# at each fasta_reader call the function return one sequence
# unitl the end of file

if __name__ == '__main__':
    import sys
    import os.path
    
    if len(sys.argv) != 2:
        sys.exit("usage multiple_fasta fasta_path")
    fasta_path = sys.argv[1]
    if not os.path.exists(fasta_path):
        sys.exit("No such file: {}".format(fasta_path))
        
    with open(fasta_path, 'r') as fasta_input:    
        sequence = True
        while sequence is not None:
            sequence =  fasta_reader(fasta_input)
            print "----------------"
            print sequence
            

multiple_fasta_reader2.py

10.1.7.3   solution 3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from collections import namedtuple 
from itertools import groupby
   
Sequence =  namedtuple("Sequence", "id comment sequence")

def fasta_iter(fasta_file):
   """
   :param fasta_file: the file containing all input sequences in fasta format.
   :type fasta_file: file object
   :author: http://biostar.stackexchange.com/users/36/brentp
   :return: for a given fasta file, it returns an iterator which yields tuples
          (string id, string comment, int sequence length)
   :rtype: iterator
   """
   with open(fasta_path) as fasta_file:
      # ditch the boolean (x[0]) and just keep the header or sequence since
      # we know they alternate.
      group = (x[1] for x in groupby(fasta_file , lambda line: line.startswith(">")))
      for header in group:
         print header
         # drop the ">"
         header = header.next()[1:].strip()
         header = header.split()
         _id = header[0]
         comment = ' '.join(header[1:])
         seq = ''.join(s.strip() for s in group.next())
         yield Sequence(_id, comment, seq)
         
#using exanple:
#f = fasta_iter('seq.fasta')
#f.next()
#or
# for seq in fasta_iter('seq.fasta'):
#   do something with seq
#The problem with this implementation is
# something goes wrong in do something with seq
# but we don't quit the program (we catch the exception for instance)
# the fasta file is still open
# it's better to put the fasta file opening out the fasta reader see fasta filter 

if __name__ == '__main__':
    import sys
    import os.path
    
    if len(sys.argv) != 2:
        sys.exit("usage multiple_fasta fasta_path")
    fasta_path = sys.argv[1]
    if not os.path.exists(fasta_path):
        sys.exit("No such file: {}".format(fasta_path))
        
    with open(fasta_path, 'r') as fasta_input:    
        for sequence in fasta_iter(fasta_input):
            print "----------------"
            print sequence
            

fasta_iterator.py .

With the first version, we have to load all sequences before to treat them. if the file is huge (>G0) it can be a problem.

The third version allow to red sequences one by one. To do that we have to open the file outside the reader function The fasta format is very convenient for human but not for parser. The end of a sequence is indicated by the end of file or the beginning of a new one. So with this version we have play with the cursor to place the cursor backward when we encounter a new sequence. Then the cursor is placed at the right place for the next sequence.

The third version is an iterator and use generator. Generators are functions which keep a state between to calls. Generators do not use return to return a value but the keyword yield. Thus this implementation return sequence by sequence without to play with the cursor. You can call this function and put in in a loop or call next. Work with the sequence and pass to the next sequence on so on. For instance which is a very convenient way to use it:

for seq in fasta_iter('my_fast_file.fasta'):
   print seq