|

A Coding Implementation to Build a Unified Tool Orchestration Framework from Documentation to Automated Pipelines

In this tutorial, we construct a compact, environment friendly framework that demonstrates how to convert device documentation into standardized, callable interfaces, register these instruments in a central system, and execute them as a part of an automatic pipeline. As we transfer by means of every stage, we create a easy converter, design mock bioinformatics instruments, set up them into a registry, and benchmark each particular person and multi-step pipeline executions. Through this course of, we discover how structured device interfaces and automation can streamline and modularize information workflows. Check out the FULL CODES here.

import re, json, time, random
from dataclasses import dataclass
from typing import Callable, Dict, Any, List, Tuple


@dataclass
class ToolSpec:
   identify: str
   description: str
   inputs: Dict[str, str]
   outputs: Dict[str, str]


def parse_doc_to_spec(identify: str, doc: str) -> ToolSpec:
   desc = doc.strip().splitlines()[0].strip() if doc.strip() else identify
   arg_block = "n".be part of([l for l in doc.splitlines() if "--" in l or ":" in l])
   inputs = {}
   for line in arg_block.splitlines():
       m = re.findall(r"(--?w[w-]*|bw+b)s*[:=]?s*(w+)?", line)
       for key, typ in m:
           okay = key.lstrip("-")
           if okay and okay not in inputs and okay not in ["Returns","Output","Outputs"]:
               inputs[k] = (typ or "str")
   if not inputs: inputs = {"in": "str"}
   return ToolSpec(identify=identify, description=desc, inputs=inputs, outputs={"out":"json"})

We begin by defining the construction for our instruments and writing a easy parser that converts plain documentation into a standardized device specification. This helps us mechanically extract parameters and outputs from textual descriptions. Check out the FULL CODES here.

def tool_fastqc(seq_fasta: str, min_len:int=30) -> Dict[str,Any]:
   seqs = [s for s in re.split(r">[^n]*n", seq_fasta)[1:]]
   lens = [len(re.sub(r"s+","",s)) for s in seqs]
   q30 = sum(l>=min_len for l in lens)/max(1,len(lens))
   gc = sum(c in "GCgc" for s in seqs for c in s)/max(1,sum(lens))
   return {"n_seqs":len(lens),"len_mean":(sum(lens)/max(1,len(lens))),"pct_q30":q30,"gc":gc}


def tool_bowtie2_like(ref:str, reads:str, mode:str="end-to-end") -> Dict[str,Any]:
   def revcomp(s):
       t=str.maketrans("ACGTacgt","TGCAtgca"); return s.translate(t)[::-1]
   reads_list=[r for r in re.split(r">[^n]*n", reads)[1:]]
   ref_seq="".be part of(ref.splitlines()[1:])
   hits=[]
   for i,r in enumerate(reads_list):
       rseq="".be part of(r.break up())
       aligned = (rseq in ref_seq) or (revcomp(rseq) in ref_seq)
       hits.append({"read_id":i,"aligned":bool(aligned),"pos":ref_seq.discover(rseq)})
   return {"n":len(hits),"aligned":sum(h["aligned"] for h in hits),"mode":mode,"hits":hits}


def tool_bcftools_like(ref:str, alt:str, win:int=15) -> Dict[str,Any]:
   ref_seq="".be part of(ref.splitlines()[1:]); alt_seq="".be part of(alt.splitlines()[1:])
   n=min(len(ref_seq),len(alt_seq)); vars=[]
   for i in vary(n):
       if ref_seq[i]!=alt_seq[i]: vars.append({"pos":i,"ref":ref_seq[i],"alt":alt_seq[i]})
   return {"n_sites":n,"n_var":len(vars),"variants":vars[:win]}


FASTQC_DOC = """FastQC-like high quality management for FASTA
--seq_fasta: str  --min_len: int   Outputs: json"""
BOWTIE_DOC = """Bowtie2-like aligner
--ref: str  --reads: str  --mode: str  Outputs: json"""
BCF_DOC = """bcftools-like variant caller
--ref: str  --alt: str  --win: int  Outputs: json"""

We create mock implementations of bioinformatics instruments reminiscent of FastQC, Bowtie2, and Bcftools. We outline their anticipated inputs and outputs to allow them to be executed constantly by means of a unified interface. Check out the FULL CODES here.

@dataclass
class MCPTool:
   spec: ToolSpec
   fn: Callable[..., Dict[str,Any]]


class MCPServer:
   def __init__(self): self.instruments: Dict[str,MCPTool] = {}
   def register(self, identify:str, doc:str, fn:Callable[...,Dict[str,Any]]):
       spec = parse_doc_to_spec(identify, doc); self.instruments[name]=MCPTool(spec, fn)
   def list_tools(self) -> List[Dict[str,Any]]:
       return [dict(name=t.spec.name, description=t.spec.description, inputs=t.spec.inputs, outputs=t.spec.outputs) for t in self.tools.values()]
   def call_tool(self, identify:str, args:Dict[str,Any]) -> Dict[str,Any]:
       if identify not in self.instruments: elevate KeyError(f"device {identify} not discovered")
       spec = self.instruments[name].spec
       kwargs={okay:args.get(okay) for okay in spec.inputs.keys()}
       return self.instruments[name].fn(**kwargs)


server=MCPServer()
server.register("fastqc", FASTQC_DOC, tool_fastqc)
server.register("bowtie2", BOWTIE_DOC, tool_bowtie2_like)
server.register("bcftools", BCF_DOC, tool_bcftools_like)


Task = Tuple[str, Dict[str,Any]]
PIPELINES = {
   "rnaseq_qc_align_call":[
       ("fastqc", {"seq_fasta":"{reads}", "min_len":30}),
       ("bowtie2", {"ref":"{ref}", "reads":"{reads}", "mode":"end-to-end"}),
       ("bcftools", {"ref":"{ref}", "alt":"{alt}", "win":15}),
   ]
}


def compile_pipeline(nl_request:str) -> List[Task]:
   key = "rnaseq_qc_align_call" if re.search(r"rna|qc|align|variant|name", nl_request, re.I) else "rnaseq_qc_align_call"
   return PIPELINES[key]

We construct a light-weight server that registers instruments, lists their specs, and permits us to name them programmatically. We additionally outline a primary pipeline construction that outlines the sequence through which instruments ought to run. Check out the FULL CODES here.

def mk_fasta(header:str, seq:str)->str: return f">{header}n{seq}n"
random.seed(0)
REF_SEQ="".be part of(random.selection("ACGT") for _ in vary(300))
REF = mk_fasta("ref",REF_SEQ)
READS = mk_fasta("r1", REF_SEQ[50:130]) + mk_fasta("r2","ACGT"*15) + mk_fasta("r3", REF_SEQ[180:240])
ALT = mk_fasta("alt", REF_SEQ[:150] + "T" + REF_SEQ[151:])


def run_pipeline(nl:str, ctx:Dict[str,str]) -> Dict[str,Any]:
   plan=compile_pipeline(nl); outcomes=[]; t0=time.time()
   for identify, arg_tpl in plan:
       args={okay:(v.format(**ctx) if isinstance(v,str) else v) for okay,v in arg_tpl.gadgets()}
       out=server.call_tool(identify, args)
       outcomes.append({"device":identify,"args":args,"output":out})
   return {"request":nl,"elapsed_s":spherical(time.time()-t0,4),"outcomes":outcomes}

We put together small artificial FASTA information for testing and implement a perform that runs the whole pipeline. Here, we dynamically move device parameters and execute every step within the sequence. Check out the FULL CODES here.

def bench_individual() -> List[Dict[str,Any]]:
   instances=[
       ("fastqc", {"seq_fasta":READS,"min_len":25}),
       ("bowtie2", {"ref":REF,"reads":READS,"mode":"end-to-end"}),
       ("bcftools", {"ref":REF,"alt":ALT,"win":10}),
   ]
   rows=[]
   for identify,args in instances:
       t0=time.time(); okay=True; err=None; out=None
       attempt: out=server.call_tool(identify,args)
       besides Exception as e: okay=False; err=str(e)
       rows.append({"device":identify,"okay":okay,"ms":int((time.time()-t0)*1000),"out_keys":listing(out.keys()) if okay else [],"err":err})
   return rows


def bench_pipeline() -> Dict[str,Any]:
   t0=time.time()
   res=run_pipeline("Run RNA-seq QC, align, and variant name.", {"ref":REF,"reads":READS,"alt":ALT})
   okay = all(step["output"] for step in res["results"])
   return {"pipeline":"rnaseq_qc_align_call","okay":okay,"ms":int((time.time()-t0)*1000),"n_steps":len(res["results"])}


print("== TOOLS =="); print(json.dumps(server.list_tools(), indent=2))
print("n== INDIVIDUAL BENCH =="); print(json.dumps(bench_individual(), indent=2))
print("n== PIPELINE BENCH =="); print(json.dumps(bench_pipeline(), indent=2))
print("n== PIPELINE RUN =="); print(json.dumps(run_pipeline("Run RNA-seq QC, align, and variant name.", {"ref":REF,"reads":READS,"alt":ALT}), indent=2))

We benchmark each particular person instruments and the complete pipeline, capturing their outputs and efficiency metrics. Finally, we print the outcomes to confirm that every stage of the workflow runs efficiently and integrates easily.

In conclusion, we develop a clear understanding of how light-weight device conversion, registration, and orchestration can work collectively in a single setting. We observe how a unified interface permits us to join a number of instruments seamlessly, run them in sequence, and measure their efficiency. This hands-on train helps us respect how easy design rules, standardization, automation, and modularity can improve the reproducibility and effectivity of computational workflows in any area.


Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be at liberty to comply with us on Twitter and don’t overlook to be part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The put up A Coding Implementation to Build a Unified Tool Orchestration Framework from Documentation to Automated Pipelines appeared first on MarkTechPost.

Similar Posts