From afed80ca7f39fa41a3c2d17db1e3657b886838c0 Mon Sep 17 00:00:00 2001 From: Jonathan Herrewijnen Date: Sun, 26 Nov 2023 21:29:22 +0100 Subject: [PATCH] Adding sequence alignment to herrewebpy --- herrewebpy/bioinformatics/__init__.py | 1 + .../bioinformatics/sequence_alignment.py | 225 ++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 herrewebpy/bioinformatics/__init__.py create mode 100644 herrewebpy/bioinformatics/sequence_alignment.py diff --git a/herrewebpy/bioinformatics/__init__.py b/herrewebpy/bioinformatics/__init__.py new file mode 100644 index 0000000..b974282 --- /dev/null +++ b/herrewebpy/bioinformatics/__init__.py @@ -0,0 +1 @@ +from . import * \ No newline at end of file diff --git a/herrewebpy/bioinformatics/sequence_alignment.py b/herrewebpy/bioinformatics/sequence_alignment.py new file mode 100644 index 0000000..db5573f --- /dev/null +++ b/herrewebpy/bioinformatics/sequence_alignment.py @@ -0,0 +1,225 @@ +from Bio import SeqIO +from Bio.SeqRecord import SeqRecord +from Bio.Seq import Seq +import pandas as pd +import numpy as np +from concurrent.futures import ThreadPoolExecutor +from herrewebpy import logger +import tqdm + +""" +Multiple sequence alignment script using Needleman-Wunsch. Writes to fasta and stockholm, as well as to a plain text file. + +Method derived from RP1 and RP2 internships. Used as a data validation technique in Alstom. + +Author: Jonathan Herrewijnen +Email: jonathan.herrewijnen@gmail.com +Date: 25 November +Herreweb +""" + + +def parse_sequence(sequence): + """ + Parse a sequence, either as a pandas DataFrame or a string, and return the result. + + Parameters: + sequence (pd.DataFrame or str): The input sequence data to be parsed. + """ + if sequence is None: + return None + elif isinstance(sequence, pd.Series): + logger.info("Parsing data as dataframe, so converting") + sequence = sequence.str.cat(sep='|') + logger.info(f'Assuming type is string, returning list') + identifiers = [item.strip('|') for item in sequence.split('|') if item.strip('|')] + return identifiers + + +def write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename): + """ + Write an alignment in Stockholm format with metadata as annotations. + + Parameters: + aligned_identifiers1 (list): List of aligned identifiers for the first sequence. + aligned_identifiers2 (list): List of aligned identifiers for the second sequence. + aligned_metadata1 (list): List of metadata corresponding to aligned_identifiers1. + aligned_metadata2 (list): List of metadata corresponding to aligned_identifiers2. + score (int): Alignment score. + output_filename (str): Name of the output Stockholm format file. + + Description: + This function writes an alignment in the Stockholm format with custom metadata as annotations. + It takes two lists of aligned identifiers (aligned_identifiers1 and aligned_identifiers2), + two lists of corresponding metadata (aligned_metadata1 and aligned_metadata2), an alignment score, + and the desired output filename. + + The function creates a Stockholm file where each sequence in the alignment is represented by its identifier. + It includes the metadata as custom annotations (#=GC METADATA1 and #=GC METADATA2) in the Stockholm file. + + The Stockholm format is commonly used for representing sequence alignments in bioinformatics. + + Example: + aligned_identifiers1 = ['COM12018', 'COM17003'] + aligned_identifiers2 = ['COM12018', 'COM17003'] + aligned_metadata1 = ['some_metadata', 'some_data'] + aligned_metadata2 = ['some_other_metadata', 'some_more_metadata'] + score = 42 + output_filename = 'alignment.stockholm' + + write_stockholm_alignment_with_metadata(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2, score, output_filename) + """ + with open(output_filename, 'w') as stockholm_file: + stockholm_file.write("# STOCKHOLM 1.0\n") + for id1, id2, metadata1, metadata2 in zip(aligned_identifiers1, aligned_identifiers2, aligned_metadata1, aligned_metadata2): + stockholm_file.write(f"{id1}\n") + stockholm_file.write(f"{id2}\n") + stockholm_file.write(f"#=GC METADATA1 {metadata1}\n") + stockholm_file.write(f"#=GC METADATA2 {metadata2}\n") + stockholm_file.write(f"#=GF SCORE: {score}\n") + + +def write_text_format(aligned_identifiers1, aligned_identifiers2, score, output_filename, aligned_metadata1=None, + aligned_metadata2=None): + """ + Write aligned identifiers in two seperate text files (for visual comparison) + """ + with open(f'1-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")] + with open(f'2-{output_filename}', 'w') as file: [file.write(f"Score: {score}\n")] + + if aligned_metadata1 is not None and aligned_metadata2 is not None: + with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1} {metadata1}\n") for id1, metadata1 in zip(aligned_identifiers1, aligned_metadata1)] + with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2} {metadata2}\n") for id2, metadata2 in zip(aligned_identifiers2, aligned_metadata2)] + else: + with open(f'1-{output_filename}', 'w') as file: [file.write(f"{id1}\n") for id1 in aligned_identifiers1] + with open(f'2-{output_filename}', 'w') as file: [file.write(f"{id2}\n") for id2 in aligned_identifiers2] + + +def global_alignment_np(sequence1, sequence2, metadata1=None, metadata2=None, gap_penalty=-1, + match_score=1, mismatch_penalty=-10, fasta_name="alignment", threads=None): + """ + Perform global sequence alignment using dynamic programming (Needleman-Wunsch). + + Parameters: + sequence1 (str): The first sequence to align. + sequence2 (str): The second sequence to align. + gap_penalty (int, optional): Penalty for introducing a gap. Default is -1. + match_score (int, optional): Score for a match. Default is 1. + mismatch_penalty (int, optional): Penalty for a mismatch. Default is -1. + + Returns: + tuple: A tuple containing the aligned longer sequence, aligned shorter sequence, and alignment score. + + Description: + This function performs global sequence alignment between two input sequences, `sequence1` and `sequence2`, + using the Needleman-Wunsch algorithm. It aligns the sequences based on the specified scoring parameters + for gap penalties, match scores, and mismatch penalties. + + The function returns a tuple containing the following elements: + - The aligned longer sequence (string). + - The aligned shorter sequence (string). + - The alignment score (int). + + The aligned sequences are represented as strings where gaps are indicated by '-' characters. + + Additionally, the function saves the alignment as a FASTA file named 'alignment.fasta' and prints a + human-readable alignment using Biopython's format_alignment function for visualization. + """ + identifiers1, metadata1 = parse_sequence(sequence1), parse_sequence(metadata1) + identifiers2, metadata2 = parse_sequence(sequence2), parse_sequence(metadata2) + m, n = len(identifiers1), len(identifiers2) + dp_matrix = np.zeros((m + 1, n + 1)) + + progress_bar = tqdm.tqdm(total=(m + 1) * (n + 1)) + with ThreadPoolExecutor(threads) as executor: + for i in range(1, m + 1): + for j in range(1, n + 1): + if identifiers1[i - 1] == identifiers2[j - 1]: + dp_matrix[i][j] = dp_matrix[i - 1][j - 1] + match_score + else: + dp_matrix[i][j] = max(dp_matrix[i - 1][j] + gap_penalty, + dp_matrix[i][j - 1] + gap_penalty, + dp_matrix[i - 1][j - 1] + mismatch_penalty) + progress_bar.update(1) + progress_bar.close() + + aligned_identifiers1, aligned_metadata1 = [], [] + aligned_identifiers2, aligned_metadata2 = [], [] + + i, j = m, n + score = dp_matrix[m][n] + logger.info(f'Calculated score is: {score}') + + while i > 0 and j > 0: + if identifiers1[i - 1] == identifiers2[j - 1]: + aligned_identifiers1.append(identifiers1[i - 1]) + aligned_identifiers2.append(identifiers2[j - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[i - 1]) + aligned_metadata2.append(metadata2[j - 1]) + i -= 1 + j -= 1 + elif dp_matrix[i][j] == dp_matrix[i - 1][j - 1] + mismatch_penalty: + aligned_identifiers1.append(identifiers1[i - 1]) + aligned_identifiers2.append(identifiers2[j - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[i - 1]) + aligned_metadata2.append(metadata2[j - 1]) + i -= 1 + j -= 1 + elif dp_matrix[i][j] == dp_matrix[i - 1][j] + gap_penalty: + aligned_identifiers1.append(identifiers1[i - 1]) + aligned_identifiers2.append('') + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[i - 1]) + aligned_metadata2.append(metadata2[j - 1]) + i -= 1 + else: + aligned_identifiers1.append('') + aligned_identifiers2.append(identifiers2[j - 1]) + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.append(metadata1[i - 1]) + aligned_metadata2.append(metadata2[j - 1]) + j -= 1 + + aligned_identifiers1.reverse() + aligned_identifiers2.reverse() + if metadata1 is not None and metadata2 is not None: + aligned_metadata1.reverse() + aligned_metadata2.reverse() + + padded_sequences1, padded_sequences2 = [], [] + + for seq1, seq2 in zip(aligned_identifiers1, aligned_identifiers2): + if seq1 == '': + padded_seq1 = '-' * len(seq2) + padded_sequences1.append(padded_seq1) + padded_sequences2.append(seq2) + elif seq2 == '': + padded_seq2 = '-' * len(seq1) + padded_sequences1.append(seq1) + padded_sequences2.append(padded_seq2) + else: + if len(seq1) < len(seq2): + padded_seq1 = seq1 + '-' * (len(seq2) - len(seq1)) + padded_sequences1.append(padded_seq1) + padded_sequences2.append(seq2) + else: + padded_seq2 = seq2 + '-' * (len(seq1) - len(seq2)) + padded_sequences1.append(seq1) + padded_sequences2.append(padded_seq2) + + if metadata1 is not None and metadata2 is not None: + write_stockholm_alignment_with_metadata(padded_sequences1, padded_sequences2, aligned_metadata1, + aligned_metadata2, score, f'{fasta_name}.sto') + write_text_format(padded_sequences1, padded_sequences2, score, f'{fasta_name}-text.txt', + aligned_metadata1, aligned_metadata2) + else: + write_text_format(padded_sequences1, padded_sequences2, score, f'{fasta_name}-text.txt') + + record1 = SeqRecord(Seq("|".join(padded_sequences1)), id="sequence1") + record2 = SeqRecord(Seq("|".join(padded_sequences2)), id="sequence2") + + SeqIO.write([record1, record2], f'{fasta_name}.fasta', "fasta") + + return '|'.join(aligned_identifiers1), '|'.join(aligned_identifiers2), score