A Coding Implementation on Loguru for Designing Robust, Structured, Concurrent, and Production-Ready Python Logging Pipelines
In this tutorial, we implement a sensible use case with Loguru, a strong, versatile, and production-ready logging library for Python. We begin by constructing a clear, idempotent logging setup that may be safely rerun with out duplicating handlers or producing messy output. From there, we transfer step-by-step by means of structured logging, contextual logging, customized log ranges, world patching, callable formatters, and in-memory sinks. We additionally deal with real-world logging wants resembling wealthy exception traces, JSON log information, customized rotation, compression, retention, async logging, threaded execution, multiprocessing-safe logging, and normal logging module interception. By conserving every thing in a Colab-ready workflow, we make it simple to check, examine, and perceive how Loguru can assist debugging, monitoring, and observability in severe Python functions.
!pip set up -q loguru nest_asyncio
import os, sys, time, json, glob, gzip, shutil, asyncio, logging, itertools, multiprocessing
from collections import deque
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from loguru import logger
attempt:
import nest_asyncio; nest_asyncio.apply()
besides Exception as e:
print("nest_asyncio not utilized:", e)
WORKDIR = "/content material/loguru_demo" if os.path.isdir("/content material") else "/tmp/loguru_demo"
os.makedirs(WORKDIR, exist_ok=True); os.chdir(WORKDIR)
for f in glob.glob("*"):
attempt: os.take away(f)
besides OSError: move
print(f"Working listing: {WORKDIR}n")
RESULTS = []
def test(identify, situation, element=""):
okay = bool(situation); RESULTS.append((identify, okay))
print(f" [{'PASS' if ok else 'FAIL'}] {identify}" + (f" — {element}" if element else ""))
def banner(t): print(f"n{'='*64}n {t}n{'='*64}")
_seq = itertools.rely(1)
def global_patcher(file):
file["extra"].setdefault("env", "colab")
file["extra"]["seq"] = subsequent(_seq)
_NOISE = {"env", "seq", "app"}
def console_formatter(file):
fmt = ("<inexperienced>{time:HH:mm:ss.SSS}</inexperienced> | <degree>{degree: <8}</degree> | "
"<cyan>{identify}:{operate}:{line}</cyan> - <degree>{message}</degree>")
if any(okay not in _NOISE for okay in file["extra"]):
fmt += " | <yellow>{further}</yellow>"
return fmt + "n{exception}"
We set up Loguru and supporting dependencies, import all required libraries, and put together a clear working listing for the tutorial. We additionally create a small verification helper to check every function because the tutorial runs. We then outline a world patcher and console formatter so that each log file carries helpful metadata and seems in a readable format.
class MemorySink:
def __init__(self, capability=2000): self.buffer = deque(maxlen=capability)
def write(self, message): self.buffer.append(message.file)
def flush(self): move
def has_level(self, identify): return any(r["level"].identify == identify for r in self.buffer)
def discover(self, pred): return [r for r in self.buffer if pred(r)]
MAX_BYTES = 1500
def size_rotation(message, file):
return file.inform() + len(message) > MAX_BYTES
def gzip_compression(filepath):
with open(filepath, "rb") as fi, gzip.open(filepath + ".gz", "wb") as fo:
shutil.copyfileobj(fi, fo)
os.take away(filepath)
def keep_latest_retention(information):
for outdated in sorted(information, key=os.path.getmtime, reverse=True)[3:]:
attempt: os.take away(outdated)
besides OSError: move
class InterceptHandler(logging.Handler):
def emit(self, file):
attempt: degree = logger.degree(file.levelname).identify
besides ValueError: degree = file.levelno
body, depth = logging.currentframe(), 2
whereas body and body.f_code.co_filename == logging.__file__:
body, depth = body.f_back, depth + 1
(logger.decide(depth=depth, exception=file.exc_info)
.bind(stdlib_logger=file.identify)
.log(degree, file.getMessage()))
def mp_worker(n):
logger.bind(youngster=os.getpid()).data("whats up from youngster merchandise {}", n)
return os.getpid()
We create reusable logging parts that make the tutorial extra sensible and production-like. We outline an in-memory sink, customized file rotation, compression, and retention capabilities to regulate how logs are saved. We additionally constructed a typical logging interceptor and a multiprocessing employee to attach Loguru to exterior libraries and youngster processes.
banner("1) logger.configure(): handlers + customized degree + further + patcher")
mem = MemorySink()
logger.configure(
handlers=[
{"sink": sys.stderr, "format": console_formatter, "level": "DEBUG",
"colorize": True, "backtrace": True, "diagnose": True},
{"sink": mem, "level": "DEBUG", "format": "{message}"},
{"sink": "structured.jsonl", "serialize": True, "level": "DEBUG",
"enqueue": True},
{"sink": "errors.log", "level": "ERROR", "enqueue": True,
"backtrace": True, "diagnose": False,
"format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | "
"{name}:{function}:{line} | {message}"},
],
ranges=[{"name": "NOTICE", "no": 22, "color": "<blue><bold>", "icon": "
"}],
further={"app": "loguru-advanced"},
patcher=global_patcher,
)
logger.debug("debug"); logger.data("data"); logger.success("SUCCESS degree ships built-in")
logger.warning("warning"); logger.log("NOTICE", "customized degree between INFO and SUCCESS")
banner("2) bind() / contextualize() / patch()")
logger.bind(user_id=42, request_id="abc-123").data("sure context")
with logger.contextualize(job="batch-job", run=7):
logger.data("inside contextualized block")
logger.patch(lambda r: r["extra"].replace(epoch=spherical(time.time()))).data("per-call patched file")
banner("3) @logger.catch + context-manager type")
def inside(d): return d["a"] / d["b"]
def outer(d): return inside(d)
@logger.catch(reraise=False)
def compute(d): return outer(d)
compute({"a": 1, "b": 0})
with logger.catch(message="dealt with inside a with-block"):
increase ValueError("growth in block")
banner("4) decide(lazy=True), inline colours, file entry")
logger.decide(lazy=True).debug("lazy sum = {}", lambda: sum(i*i for i in vary(1_000_000)))
logger.decide(colours=True).data("inline <crimson>colours</crimson> <inexperienced>work</inexperienced>")
logger.decide(file=True).data("emitted from supply line {file[line]}")
We configure Loguru with a number of handlers, together with console output, reminiscence seize, JSON logging, and error logging. We then reveal structured logging with sure context, contextual blocks, patched data, and a customized log degree. We additionally discover exception dealing with and helpful decide() options resembling lazy analysis, inline colours, and file entry.
banner("5) customized rotation/compression/retention (forces actual rotation)")
ev_id = logger.add("events_{time:HHmmss_SSS}.log",
rotation=size_rotation, compression=gzip_compression,
retention=keep_latest_retention, enqueue=True, degree="DEBUG",
format="{time:HH:mm:ss.SSS} | {degree: <8} | {message}")
for i in vary(80):
logger.bind(idx=i).debug("rotating occasion line quantity {}", i)
logger.full(); logger.take away(ev_id)
print(f" archives created: {sorted(glob.glob('events_*.gz'))}")
banner("6a) ThreadPoolExecutor with per-thread contextualize()")
thread_caps = []
tid = logger.add(thread_caps.append, degree="DEBUG", format="{message}",
filter=lambda r: "worker_id" in r["extra"])
def employee(n):
with logger.contextualize(worker_id=n):
logger.data("thread work merchandise {}", n)
return n * n
with ThreadPoolExecutor(max_workers=8) as ex:
sq = listing(ex.map(employee, vary(8)))
logger.full(); logger.take away(tid)
worker_ids = {m.file["extra"]["worker_id"] for m in thread_caps}
banner("6b) async coroutine sink + await logger.full()")
async def run_async_demo():
sunk = []
async def async_sink(message):
await asyncio.sleep(0); sunk.append(message.file["message"])
sid = logger.add(async_sink, degree="DEBUG", catch=True)
async def job(n):
with logger.contextualize(coro=n):
logger.data("async job {} begin", n)
await asyncio.sleep(0.01)
logger.success("async job {} achieved", n)
await asyncio.collect(*(job(i) for i in vary(5)))
await logger.full()
logger.take away(sid)
return sunk
attempt:
async_msgs = asyncio.run(run_async_demo())
besides RuntimeError:
async_msgs = asyncio.get_event_loop().run_until_complete(run_async_demo())
print(f" async sink obtained {len(async_msgs)} messages")
We reveal customized file administration by robotically rotating, compressing, and retaining log information. We then check thread-safe logging by working a number of employees, every with its personal contextual metadata. We additionally add an asynchronous coroutine sink to see how Loguru handles async duties and appropriately drains pending logs.
banner("7) intercept stdlib `logging` and filter a chatty library")
logging.basicConfig(handlers=[InterceptHandler()], degree=0, drive=True)
lib_caps = []
def lib_filter(file):
if file["extra"].get("stdlib_logger") == "chatty":
return file["level"].no >= logger.degree("WARNING").no
return True
lid = logger.add(lib_caps.append, degree="DEBUG", format="{message}",
filter=lambda r: ("stdlib_logger" in r["extra"]) and lib_filter(r))
logging.getLogger("chatty").data("noisy data (needs to be filtered out)")
logging.getLogger("chatty").warning("noisy warning (saved)")
logging.getLogger("necessary").debug("necessary debug (saved)")
logger.full(); logger.take away(lid)
banner("8) SELF-TESTS")
logger.full(); time.sleep(0.2)
attempt:
rec = json.hundreds(open("structured.jsonl").learn().splitlines()[-1])
test("JSON sink serializes data", {"textual content", "file"} <= set(rec))
besides Exception as e:
test("JSON sink serializes data", False, str(e))
attempt:
err_txt = open("errors.log").learn()
test("errors.log captured ZeroDivisionError", "ZeroDivisionError" in err_txt)
besides Exception as e:
test("errors.log captured ZeroDivisionError", False, str(e))
test("customized rotation produced .gz archives", len(glob.glob("events_*.gz")) >= 1,
f"{len(glob.glob('events_*.gz'))} archive(s)")
test("customized NOTICE degree recorded", mem.has_level("NOTICE"))
test("SUCCESS degree recorded", mem.has_level("SUCCESS"))
test("sure user_id=42 current", bool(mem.discover(lambda r: r["extra"].get("user_id") == 42)))
test("contextualize job current", bool(mem.discover(lambda r: r["extra"].get("job") == "batch-job")))
test("world patcher stamped env", bool(mem.discover(lambda r: r["extra"].get("env") == "colab")))
test("exception captured in a file",bool(mem.discover(lambda r: r["exception"] just isn't None)))
test("threads logged all 8 employees", worker_ids == set(vary(8)), str(sorted(worker_ids)))
test("async sink obtained 10 messages", len(async_msgs) == 10, f"{len(async_msgs)} msgs")
saved = {m.file["extra"]["stdlib_logger"] + ":" + m.file["level"].identify for m in lib_caps}
test("library INFO filtered, relaxation saved",
("chatty:INFO" not in saved) and ("chatty:WARNING" in saved) and ("necessary:DEBUG" in saved),
str(sorted(saved)))
We intercept Python’s built-in logging module and route normal library logs into Loguru. We apply source-aware filtering in order that noisy logs from one library may be suppressed whereas necessary messages are nonetheless saved. We then run self-tests to confirm JSON logging, error seize, archive creation, context propagation, exception data, threaded logs, async logs, and filtering conduct.
banner("9) throughput: enqueue=False vs enqueue=True")
def bench(enqueue, n=15000):
logger.take away()
sid = logger.add(lambda m: None, degree="DEBUG", format="{message}", enqueue=enqueue)
t0 = time.perf_counter()
for i in vary(n): logger.bind(i=i).debug("benchmark {}", i)
logger.full(); dt = time.perf_counter() - t0
logger.take away(sid)
return n / dt
attempt:
sync_tput = bench(False); async_tput = bench(True)
print(f" direct : {sync_tput:,.0f} msg/s")
print(f" enqueue : {async_tput:,.0f} msg/s (non-blocking, course of/thread-safe)")
test("benchmark accomplished", sync_tput > 0 and async_tput > 0)
besides Exception as e:
test("benchmark accomplished", False, str(e))
banner("10) multiprocessing with enqueue=True (fork)")
attempt:
logger.take away()
mp_id = logger.add("mp.log", enqueue=True, degree="DEBUG",
format="{further[child]} | {message}")
ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(max_workers=4, mp_context=ctx) as ex:
pids = listing(ex.map(mp_worker, vary(4)))
logger.full(); logger.take away(mp_id); time.sleep(0.1)
strains = open("mp.log").learn().splitlines()
test("multiprocessing logged from kids", len(strains) >= 4,
f"{len(strains)} strains from {len(set(pids))} PIDs")
besides Exception as e:
test("multiprocessing logged from kids", False, f"unsupported right here: {e}")
logger.take away()
logger.add(sys.stderr, degree="INFO", colorize=True,
format="<inexperienced>{time:HH:mm:ss}</inexperienced> | <degree>{degree}</degree> | {message}")
banner("RESULTS")
handed = sum(okay for _, okay in RESULTS)
for identify, okay in RESULTS:
print(f" {'
' if okay else '
'} {identify}")
print(f"n {handed}/{len(RESULTS)} checks handed")
print(f" information: {sorted(glob.glob('*'))}")
(logger.success if handed == len(RESULTS) else logger.warning)(
"
Loguru tutorial full!" if handed == len(RESULTS)
else f"
Completed with {len(RESULTS)-passed} failed test(s)"
)
We benchmark Loguru throughput by evaluating direct logging with enqueue-based logging. We then check multiprocessing-safe logging by writing messages from youngster processes right into a shared log file. Also, we clear up the logger, print all check outcomes, listing the generated information, and present whether or not the tutorial completes efficiently.
In conclusion, we constructed a whole and sturdy logging system utilizing Loguru that goes far past fundamental print-style debugging. We realized methods to configure a number of sinks, seize structured JSON data, add contextual metadata, protect helpful exception data, handle rotating log information, filter noisy third-party libraries, and deal with concurrent workloads throughout threads, async duties, and processes. We additionally included self-verification checks and a small benchmark to substantiate that the logging pipeline works appropriately and to evaluate its efficiency conduct.
Check out the Full Codes here. Also, be happy to observe us on Twitter and don’t overlook to affix our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and many others.? Connect with us
The submit A Coding Implementation on Loguru for Designing Robust, Structured, Concurrent, and Production-Ready Python Logging Pipelines appeared first on MarkTechPost.
