Source code for dnaweaver.DnaSupplier.builtin_suppliers.DnaAssemblyStation.DnaAssemblyStation

import numpy as np
from ....DnaQuote import DnaQuote
from ....SegmentSelector import TmSegmentSelector, FixedSizeSegmentSelector
from ....DnaAssemblyMethod import (
    OligoAssemblyMethod,
    GibsonAssemblyMethod,
    GoldenGateAssemblyMethod,
)
from ...builtin_constraints import SequenceLengthConstraint
from ...DnaSupplier import DnaSupplier
from ..DnaSuppliersComparator import DnaSuppliersComparator
from .SequenceDecomposer import SequenceDecomposer, NoSolutionFoundError


[docs]class DnaAssemblyStation(DnaSupplier): """DNA Assembly stations assemble together DNA fragments using a specific assembly method. Parameters ---------- name Name of the station (appears on reports). assembly_method A DnaAssemblyMethod object specifying how the fragments are assembled, what sequences can be assembled, what fragments can be used, etc. supplier """ class_description = "DNA assembly station" operation_type = "assembly" report_fa_symbol = u"" report_fa_symbol_plain = "flask" report_color = "#eeeeff" def __init__( self, name, assembly_method, supplier, memoize=False, decomposer_class=None, a_star_auto_multiplier=2, **solver_kwargs ): self.name = name self.assembly_method = assembly_method self.set_suppliers(supplier) self.extra_time = assembly_method.duration self.extra_cost = assembly_method.cost self.sequence_constraints = assembly_method.sequence_constraints self.cuts_set_constraints = assembly_method.cuts_set_constraints self.memoize = memoize if decomposer_class is None: self.decomposer_class = SequenceDecomposer else: self.decomposer_class = decomposer_class self.memoize_dict = {} self.min_basepair_price = self.supplier.min_basepair_price if solver_kwargs.get("a_star_factor", None) == "auto": solver_kwargs["a_star_factor"] = ( a_star_auto_multiplier * self.min_basepair_price ) self.solver_kwargs = solver_kwargs
[docs] def get_quote_for_sequence_segment( self, sequence, segment, max_lead_time=None, **kwargs ): """Return the cost of the segment. Is used as the "cost" function for a segment during decomposition optimization. """ fragment_to_order = self.assembly_method.compute_fragment_for_sequence_segment( sequence=sequence, segment=segment, **kwargs ) return self.supplier.get_quote(fragment_to_order, max_lead_time=max_lead_time)
[docs] def get_assembly_plan_from_cuts(self, sequence, cuts, max_lead_time=None): """Return a plan {segment: quote, ...} based on the cut positions. Where "segment" is of the form (start, end). """ cuts = sorted(cuts) return { segment: self.get_quote_for_sequence_segment( sequence, segment, max_lead_time=max_lead_time, segment_position=i, ) for i, segment in enumerate(zip(cuts, cuts[1:])) }
def new_sequence_decomposer( self, sequence, max_lead_time=None, coarse_grain=1, fine_grain=1, cut_spread_radius=0, a_star_factor=0, logger=None, ): def segment_score(segment): quote = self.get_quote_for_sequence_segment( sequence, segment, max_lead_time=max_lead_time, ) if not quote.accepted: return -1 else: return quote.price assembly = self.assembly_method return self.decomposer_class( sequence_length=len(sequence), segment_score_function=segment_score, cut_location_constraints=[ cs(sequence) for cs in assembly.cut_location_constraints ], segment_constraints=[cs(sequence) for cs in assembly.segment_constraints], min_segment_length=assembly.min_segment_length, max_segment_length=assembly.max_segment_length, forced_cuts=assembly.force_cuts(sequence), suggested_cuts=self.compute_suggested_cuts(sequence), coarse_grain=coarse_grain, fine_grain=fine_grain, logger=logger, bar_prefix="[%s] " % self.name, a_star_factor=a_star_factor, path_size_limit=assembly.max_fragments, cuts_set_constraints=[cs(sequence) for cs in assembly.cuts_set_constraints], cut_spread_radius=cut_spread_radius, ) def compute_suggested_cuts(self, sequence): all_supplier_cuts = [] for supplier in self.suppliers: if hasattr(supplier, "suggest_cuts"): supplier_cuts = supplier.suggest_cuts(sequence) all_supplier_cuts.extend(list(supplier_cuts)) assembly_method_cuts = self.assembly_method.suggest_cuts(sequence) suggested = list(assembly_method_cuts) + list(all_supplier_cuts) return sorted(set(suggested)) def suggest_cuts(self, sequence): return self.compute_suggested_cuts(sequence=sequence)
[docs] def get_assembly_plan_for_sequence( self, sequence, max_lead_time=None, coarse_grain=1, fine_grain=1, cut_spread_radius=0, a_star_factor=0, logger=None, ): """Return the plan {(seg, ment): quote, ...} of the optimal strategy for the sequence's decomposition.""" # extended_sequence = self.assembly_method.extend_sequence(sequence) decomposer = self.new_sequence_decomposer( sequence=sequence, max_lead_time=max_lead_time, coarse_grain=coarse_grain, fine_grain=fine_grain, a_star_factor=a_star_factor, cut_spread_radius=cut_spread_radius, logger=logger, ) self._lattest_decomposer = decomposer # for debugging best_cuts = decomposer.compute_optimal_cuts() return self.get_assembly_plan_from_cuts( sequence, best_cuts, max_lead_time=max_lead_time, )
[docs] def get_best_price( self, sequence, max_lead_time=None, with_assembly_plan=False, ): """Returns a price-optimal DnaQuote for the given sequence. Parameters ---------- sequence (str) The sequence submitted to the Dna Source for a quote. max_lead_time (float) If provided, the quote returned is the best quote (price-wise) whose lead time is less or equal to max_lead_time. with_assembly_plan If True, the assembly plan is added to the quote. """ if max_lead_time is not None: max_lead_time = max_lead_time - self.extra_time if max_lead_time < 0: return DnaQuote( self, sequence, accepted=False, message="Lead time limit too short", ) try: assembly_plan = self.get_assembly_plan_for_sequence( sequence, max_lead_time=max_lead_time, **self.solver_kwargs ) except NoSolutionFoundError as err: message = "No solution found! %s" % err return DnaQuote(self, sequence, accepted=False, message=message) # A solution has been found ! Now compute overall time and lead time. if any([q.price is None for q in assembly_plan.values()]): return DnaQuote( self, sequence, accepted=False, message="No solution found !" ) quote = DnaQuote(self, sequence, assembly_plan=assembly_plan) quote.price = quote.children_total_price() + self.extra_cost children_lead_time = quote.children_overall_lead_time() if (children_lead_time is None) or (self.extra_time is None): quote.lead_time = None else: quote.lead_time = children_lead_time + self.extra_time if not with_assembly_plan: self.assembly_plan = None return quote
def additional_dict_description(self): result = { "dna source": self.supplier.name, "solver parameters": self.solver_kwargs, } result.update( { ("assembly method %s" % k): v for k, v in self.assembly_method.dict_description().items() } ) return result @staticmethod def from_dict(data): sequence_constraints = [] if data["use_size_range"]: mini, maxi = data["size_range"] sequence_constraints.append(SequenceLengthConstraint(mini, maxi)) min_length, max_length = data["fragments_size_range"] if data["method"] == "type_iis": gc_range = data.get("overhang_gc_range", [0, 1]) method = GoldenGateAssemblyMethod( duration=data["duration"], cost=data["cost"], min_overhangs_gc=gc_range[0], max_overhangs_gc=gc_range[1], enzyme=data["enzyme"], max_segment_length=max_length, min_segment_length=min_length, max_fragments=data["max_fragments"], sequence_constraints=sequence_constraints, ) elif data["method"] in [ "gibson_assembly", "yeast_recombination", "oligo_assembly", ]: if data["overhang_type"] == "tm": min_oh_size, max_oh_size = data["overhang_size_range"] min_tm, max_tm = data["tm_range"] overhang_selector = TmSegmentSelector( min_size=min_oh_size, max_size=max_oh_size, min_tm=min_tm, max_tm=max_tm, ) else: overhang_selector = FixedSizeSegmentSelector( segment_size=data["overlap"] ) if data["method"] == "oligo_assembly": method_class = OligoAssemblyMethod else: method_class = GibsonAssemblyMethod method = method_class( duration=data["duration"], cost=data["cost"], overhang_selector=overhang_selector, max_segment_length=max_length, min_segment_length=min_length, max_fragments=data["max_fragments"], sequence_constraints=sequence_constraints, ) if data["use_astar"]: a_star_factor = data.get("astar_factor", "auto") else: a_star_factor = 0 max_construct_length = max_length * data["max_fragments"] if data["grain_type"] == "auto": data["coarse_grain"] = int(max_construct_length / 15) data["fine_grain"] = int(np.sqrt(data["coarse_grain"])) return DnaAssemblyStation( name=data["name"], supplier=data["suppliers"], assembly_method=method, coarse_grain=data["coarse_grain"], fine_grain=data["fine_grain"], a_star_factor=a_star_factor, memoize=data.get("memoize", False), logger=data.get("logger", None), ) def set_suppliers(self, suppliers): if hasattr(suppliers, "__iter__"): if len(suppliers) > 1: self.supplier = DnaSuppliersComparator( name=self.name + " comparator", suppliers=suppliers ) self.supplier.is_ghost_source = True else: self.supplier = suppliers[0] else: self.supplier = suppliers suppliers = [suppliers] self.suppliers = suppliers