Source code for dnachisel.biotools.bowtie

import subprocess
import tempfile
import os

def run_process(name, parameters):
    process = subprocess.run(
        parameters,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    if process.returncode:
        error = process.stderr.decode()
        raise OSError("%s failed:\n\n%s" % (name, error))
    return process.stdout

def create_bowtie_index_from_sequences(sequences, path):
    fasta_path = os.path.join(path, 'sequences.fa')
    bowtie_path = os.path.join(path, 'bowtie')
    with open(fasta_path, 'w') as f:
        f.write("\n".join([
            ">%d\n%s" % (i, sequence)
            for i, sequence in enumerate(sequences)
        ]))
    run_process("build-bowtie", [
        "bowtie-build", "-f", fasta_path, bowtie_path, "--quiet"
    ])
    return bowtie_path

[docs]def find_all_bowtie_matches( sequence, bowtie_index_path, match_length, max_mismatches=0 ): """Return (short) matches between a sequence and a Bowtie index. The result is of the form [(start, end), n_mismatches)] where (start, end) indicates the position of the match in the sequence, and n_mismatches is the number of mismatches with the closest homology in the index. """ # CREATE THE PARAMETERS parameters = ["bowtie"] parameters += ["--best", "-k", "1"] # only return the best alignments parameters += ["-v", str(max_mismatches)] # only allow that N mismatches parameters += [bowtie_index_path] parameters += ["--quiet", "--suppress", "2,3,4,5,6,7"] # small output k = match_length kmers = [sequence[i : i + k] for i in range(len(sequence) - k + 1)] if k * len(kmers) < 10000: # Input the sequences directly tmp_fasta_path = None parameters += ["-c", ",".join(kmers)] else: # Write sequences to a file if too many. tmp_fasta_path = tempfile.mktemp(".fa") with open(tmp_fasta_path, "w") as f: entries = [">%d\n%s" % (i, s) for i, s in enumerate(kmers)] f.write("\n\n".join(entries)) parameters += ["-f", tmp_fasta_path] # RUN THE PROCESS try: output = run_process("BOWTIE", parameters) except Exception as err: raise err finally: if tmp_fasta_path is not None: os.remove(tmp_fasta_path) output_records = [ line.split("\t") for line in output.decode().split("\n") if len(line) ] return [ ((int(index), int(index) + k), edits.count(":")) for index, edits in output_records ]