|

A Coding Guide to Build a Production-Grade Background Task Processing System Using Huey with SQLite, Scheduling, Retries, Pipelines, and Concurrency Control

In this tutorial, we discover how to construct a absolutely useful background job processing system utilizing Huey immediately, with out counting on Redis. We configure a SQLite-backed Huey occasion, begin a actual client within the pocket book, and implement superior job patterns, together with retries, priorities, scheduling, pipelines, locking, and monitoring by way of indicators. As we transfer step-by-step, we show how we are able to simulate production-grade asynchronous job dealing with whereas protecting all the things self-contained and simple to run in a cloud pocket book surroundings.

!pip -q set up -U huey


import os
import time
import json
import random
import threading
from datetime import datetime


from huey import SqliteHuey, crontab
from huey.constants import WORKER_THREAD


DB_PATH = "/content material/huey_demo.db"
if os.path.exists(DB_PATH):
   os.take away(DB_PATH)


huey = SqliteHuey(
   title="colab-huey",
   filename=DB_PATH,
   outcomes=True,
   store_none=False,
   utc=True,
)


print("Huey backend:", kind(huey).__name__)
print("SQLite DB at:", DB_PATH)

We set up Huey and configure a SQLite-backed occasion. We initialize the database file and guarantee a clear surroundings earlier than beginning execution. By doing this, we set up a light-weight but production-style job queue setup with out exterior dependencies.

EVENT_LOG = []


@huey.sign()
def _log_all_signals(sign, job, exc=None):
   EVENT_LOG.append({
       "ts": datetime.utcnow().isoformat() + "Z",
       "sign": str(sign),
       "job": getattr(job, "title", None),
       "id": getattr(job, "id", None),
       "args": getattr(job, "args", None),
       "kwargs": getattr(job, "kwargs", None),
       "exc": repr(exc) if exc else None,
   })


def print_latest_events(n=10):
   print("n--- Latest Huey occasions ---")
   for row in EVENT_LOG[-n:]:
       print(json.dumps(row, indent=2))

We implement a sign handler to seize and retailer job lifecycle occasions in a structured log. We monitor execution particulars, together with job IDs, arguments, and exceptions, to enhance observability. Through this mechanism, we construct real-time monitoring into our asynchronous system.

@huey.job(precedence=50)
def quick_add(a, b):
   return a + b


@huey.job(precedence=10)
def slow_io(seconds=1.0):
   time.sleep(seconds)
   return f"slept={seconds}"


@huey.job(retries=3, retry_delay=1, precedence=100)
def flaky_network_call(p_fail=0.6):
   if random.random() < p_fail:
       increase RuntimeError("Transient failure (simulated)")
   return "OK"


@huey.job(context=True, precedence=60)
def cpu_pi_estimate(samples=200_000, job=None):
   inside = 0
   rnd = random.random
   for _ in vary(samples):
       x, y = rnd(), rnd()
       if x*x + y*y <= 1.0:
           inside += 1
   est = 4.0 * inside / samples
   return {"task_id": job.id if job else None, "pi_estimate": est, "samples": samples}

We outline a number of duties with priorities, retry configurations, and contextual consciousness. We simulate completely different workloads, together with easy arithmetic, I/O delay, transient failures, and CPU-bound computation. By doing this, we show how Huey handles reliability, execution order, and job metadata.

@huey.lock_task("demo:daily-sync")
@huey.job()
def locked_sync_job(tag="sync"):
   time.sleep(1.0)
   return f"locked-job-done:{tag}:{datetime.utcnow().isoformat()}Z"


@huey.job()
def fetch_number(seed=7):
   random.seed(seed)
   return random.randint(1, 100)


@huey.job()
def transform_number(x, scale=3):
   return x * scale


@huey.job()
def store_result(x):
   return {"stored_value": x, "stored_at": datetime.utcnow().isoformat() + "Z"}

We introduce locking to forestall concurrent execution of essential jobs. We additionally outline duties that may later be chained collectively utilizing pipelines to kind structured workflows. Through this design, we mannequin sensible background processing patterns that require sequencing and concurrency management.

TICK = {"depend": 0}


@huey.job()
def heartbeat():
   TICK["count"] += 1
   print(f"[heartbeat] tick={TICK['count']} utc={datetime.utcnow().isoformat()}Z")


@huey.periodic_task(crontab(minute="*"))
def heartbeat_minutely():
   heartbeat()


_TIMER_STATE = {"working": False, "timer": None}


def start_seconds_heartbeat(interval_sec=15):
   _TIMER_STATE["running"] = True
   def _tick():
       if not _TIMER_STATE["running"]:
           return
       huey.enqueue(heartbeat.s())
       t = threading.Timer(interval_sec, _tick)
       _TIMER_STATE["timer"] = t
       t.begin()
   _tick()


def stop_seconds_heartbeat():
   _TIMER_STATE["running"] = False
   t = _TIMER_STATE.get("timer")
   if t isn't None:
       attempt:
           t.cancel()
       besides Exception:
           move
   _TIMER_STATE["timer"] = None

We outline heartbeat conduct and configure minute-level periodic execution utilizing Huey’s crontab scheduling. We additionally implement a timer-based mechanism to simulate sub-minute execution intervals for demonstration functions. With this setup, we create seen recurring background exercise throughout the pocket book.

client = huey.create_consumer(
   employees=4,
   worker_type=WORKER_THREAD,
   periodic=True,
   initial_delay=0.1,
   backoff=1.15,
   max_delay=2.0,
   scheduler_interval=1,
   check_worker_health=True,
   health_check_interval=10,
   flush_locks=False,
)


consumer_thread = threading.Thread(goal=client.run, daemon=True)
consumer_thread.begin()
print("Consumer began (threaded).")


print("nEnqueue fundamentals...")
r1 = quick_add(10, 32)
r2 = slow_io(0.75)
print("quick_add consequence:", r1(blocking=True, timeout=5))
print("slow_io consequence:", r2(blocking=True, timeout=5))


print("nRetries + precedence demo (flaky job)...")
rf = flaky_network_call(p_fail=0.7)
attempt:
   print("flaky_network_call consequence:", rf(blocking=True, timeout=10))
besides Exception as e:
   print("flaky_network_call failed even after retries:", repr(e))


print("nContext job (job id inside payload)...")
rp = cpu_pi_estimate(samples=150_000)
print("pi payload:", rp(blocking=True, timeout=20))


print("nLocks demo: enqueue a number of locked jobs shortly (ought to serialize)...")
locked_results = [locked_sync_job(tag=f"run{i}") for i in range(3)]
print([res(blocking=True, timeout=10) for res in locked_results])


print("nScheduling demo: run slow_io in ~3 seconds...")
rs = slow_io.schedule(args=(0.25,), delay=3)
print("scheduled deal with:", rs)
print("scheduled slow_io consequence:", rs(blocking=True, timeout=10))


print("nRevoke demo: schedule a job in 5s then revoke earlier than it runs...")
rv = slow_io.schedule(args=(0.1,), delay=5)
rv.revoke()
time.sleep(6)
attempt:
   out = rv(blocking=False)
   print("revoked job output:", out)
besides Exception as e:
   print("revoked job didn't produce consequence (anticipated):", kind(e).__name__, str(e)[:120])


print("nPipeline demo...")
pipeline = (
   fetch_number.s(123)
   .then(transform_number, 5)
   .then(store_result)
)
pipe_res = huey.enqueue(pipeline)
print("pipeline ultimate consequence:", pipe_res(blocking=True, timeout=10))


print("nStarting 15-second heartbeat demo for ~40 seconds...")
start_seconds_heartbeat(interval_sec=15)
time.sleep(40)
stop_seconds_heartbeat()
print("Stopped 15-second heartbeat demo.")


print_latest_events(12)


print("nStopping client gracefully...")
client.cease(sleek=True)
consumer_thread.be part of(timeout=5)
print("Consumer stopped.")

We begin a threaded client contained in the pocket book to course of duties asynchronously. We enqueue duties, check retries, show scheduling and revocation, execute pipelines, and observe logged indicators. Finally, we gracefully shut down the patron to guarantee clear useful resource administration and managed system termination.

In conclusion, we designed and executed a complicated asynchronous job system utilizing Huey with a SQLite backend and an in-notebook client. We applied retries, job prioritization, future scheduling, revocation, locking mechanisms, job chaining by pipelines, and periodic conduct simulation, all inside a Colab-friendly setup. Through this strategy, we gained a clear understanding of how to use Huey to handle background workloads effectively and prolong this structure to real-world manufacturing deployments.


Check out the Full Coding Notebook/Implementation hereAlso, be happy to observe us on Twitter and don’t neglect to be part of our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to associate with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us

The publish A Coding Guide to Build a Production-Grade Background Task Processing System Using Huey with SQLite, Scheduling, Retries, Pipelines, and Concurrency Control appeared first on MarkTechPost.

Similar Posts