|

CUP (Common Useful Python): Building Reliable Python Workflows with Baidu’s Utility Toolkit

In this tutorial, we discover CUP, Baidu’s Common Useful Python library, as a sensible utility toolkit for constructing stronger Python workflows. We start by establishing the library in a Colab-friendly setting after which transfer by way of its main subsystems step-by-step, together with logging, decorators, nested configuration, caching, ID technology, thread swimming pools, interruptible threads, delayed execution, time utilities, Linux useful resource monitoring, file locking, networking helpers, object storage interfaces, kind maps, and built-in testing assertions. As we progress, we don’t simply name capabilities at random; we observe how every module suits into real-world growth duties akin to monitoring, automation, concurrency, configuration administration, and reliability checks.

CUP Setup and Logging

import os
import sys
import time
import threading
import tempfile
import datetime
import subprocess
def banner(title):
   line = "=" * 70
   print("n" + line + "n" + title + "n" + line)
def skip(exc):
   """Report a gracefully-skipped part with out aborting the pocket book."""
   print("   [skipped — {}: {}]".format(kind(exc).__name__, exc))
banner("0. SETUP  (set up + cup.platforms + cup.model)")
subprocess.run(
   [sys.executable, "-m", "pip", "install", "-q", "cup", "pytz"],
   examine=False,
)
import cup
ver = getattr(cup, "__version__", None)
if ver is None:
   strive:
       from cup import model as _v
       ver = getattr(_v, "VERSION", None) or getattr(_v, "__version__", "unknown")
   besides Exception:
       ver = "unknown"
print("CUP model :", ver)
print("Python      :", sys.model.break up()[0])
strive:
   from cup import platforms
   print("is_linux    :", platforms.is_linux())
   print("is_mac      :", platforms.is_mac())
   print("is_windows  :", platforms.is_windows())
   print("is_py3      :", platforms.is_py3())
besides Exception as e:
   skip(e)
banner("1. LOGGING  (cup.log)")
LOGFILE = os.path.be a part of(tempfile.gettempdir(), "cup_tutorial.log")
strive:
   from cup import log
   log.init_comlog(
       "cup_tutorial",
       log.INFO,
       LOGFILE,
       log.ROTATION,
       10 * 1024 * 1024,
       True,
       False,
   )
   log.data("good day from cup.log — written to file AND stdout")
   log.warning("a warning line")
   log.info_if(2 > 1, "info_if(True)  -> emitted")
   log.info_if(1 > 2, "info_if(False) -> you'll NOT see this")
   log.setloglevel(log.DEBUG)
   log.debug("debug seen after setloglevel(DEBUG)")
   strive:
       with open(LOGFILE) as fh:
           final = [ln for ln in fh.read().splitlines() if ln.strip()][-1]
       parsed = log.parse(final)
       print("parsed final log line ->")
       for ok in ("loglevel", "date", "time", "pid", "srcline", "msg"):
           if isinstance(parsed, dict) and ok in parsed:
               print("   {:8}: {}".format(ok, parsed[k]))
   besides Exception as e:
       skip(e)
besides Exception as e:
   skip(e)

We start by establishing the CUP tutorial setting and putting in the required packages straight from Python. We outline helper capabilities that hold the pocket book readable and permit failed sections to be skipped safely. We then discover CUP model particulars, platform checks, and structured logging to know the library’s basis.

Decorators and Nested Config

banner("2. DECORATORS  (cup.decorators)")
strive:
   from cup import decorators
   @decorators.Singleton
   class AppConfig(object):
       def __init__(self):
           self.created_at = time.time()
   a, b = AppConfig(), AppConfig()
   print("Singleton: a is b ->", a is b, "(similar created_at:",
         a.created_at == b.created_at, ")")
   @decorators.HintUsedTime(
       b_print_stdout=True,
       enter_msg="event_id=0xABCDE enter",
       leave_msg="event_id=0xABCDE go away",
   )
   def heavy_compute():
       time.sleep(0.2)
       return sum(vary(200000))
   print("heavy_compute() =", heavy_compute())
   @decorators.needlinux
   def linux_only():
       return "this physique is allowed to run on Linux"
   print("needlinux ->", linux_only())
besides Exception as e:
   skip(e)
banner("3. RICH NESTED CONFIG  (cup.util.conf)")
CONF_PATH = os.path.be a part of(tempfile.gettempdir(), "cup_demo.conf")
CONF_TEXT = """
# ---- world scalars (layer 0) ----
host: abc.com
port: 12345
debug: false
[monitor]
enabled: true
interval: 60
regex: sshd
[.thresholds]
cpu_max: 90
mem_max: 80
[..actions]
on_breach: alert
[storage]
@path: /knowledge/disk1
@path: /knowledge/disk2
@path: /knowledge/disk3
"""
strive:
   from cup.util import conf
   with open(CONF_PATH, "w") as fh:
       fh.write(CONF_TEXT)
   cfg = conf.Configure2Dict(CONF_PATH, separator=":").get_dict()
   print("host                          :", cfg["host"])
   print("port                          :", cfg["port"])
   print("monitor.enabled               :", cfg["monitor"]["enabled"])
   print("monitor.regex                 :", cfg["monitor"]["regex"])
   print("monitor.thresholds.cpu_max    :", cfg["monitor"]["thresholds"]["cpu_max"])
   print("monitor.thresholds.actions    :",
         cfg["monitor"]["thresholds"]["actions"]["on_breach"])
   print("storage.path (repeated @ -> listing):", listing(cfg["storage"]["path"]))
   cfg["port"] = "10085"
   cfg["monitor"]["thresholds"]["actions"]["on_breach"] = "restart"
   NEW_PATH = CONF_PATH + ".new"
   conf.Dict2Configure(cfg, separator=":").write_conf(NEW_PATH)
   re_read = conf.Configure2Dict(NEW_PATH, separator=":").get_dict()
   print("round-trip port               :", re_read["port"], "(was 12345)")
   print("round-trip on_breach          :",
         re_read["monitor"]["thresholds"]["actions"]["on_breach"], "(was alert)")
besides Exception as e:
   skip(e)

We transfer on to CUP decorators and see how they assist us create single-instance lessons, monitor execution time, and shield Linux-only capabilities. We then work with CUP’s wealthy configuration system and cargo a nested configuration file with sections, little one sections, and repeated values. We additionally replace the configuration and write it again to disk to substantiate that the read-modify-write move works appropriately.

Caching, IDs, Thread Pools

banner("4. IN-MEMORY KV CACHE  (cup.cache)")
strive:
   from cup import cache
   kv = cache.KVCache(title="demo")
   kv.set({"person:1": "alice", "person:2": "bob"}, expire_sec=2)
   kv.set({"config:flag": "on"}, expire_sec=None)
   print("dimension after units         :", kv.dimension())
   print("get person:1              :", kv.get("person:1"))
   print("get lacking key         :", kv.get("nope"))
   print("sleeping 2.2s to let the 2s-TTL keys expire ...")
   time.sleep(2.2)
   print("get person:1 (expired)    :", kv.get("person:1"))
   print("get config:flag (everlasting):", kv.get("config:flag"))
   reclaimed = kv.pop_n_expired(0)
   print("pop_n_expired reclaimed :", listing(reclaimed.keys()) if reclaimed else [])
besides Exception as e:
   skip(e)
banner("5. UNIQUE ID GENERATION  (cup.providers.generator)")
strive:
   from cup.providers import generator
   gman = generator.CGeneratorMan()
   print("uniqname                :", gman.get_uniqname())
   print("next_uniq_num           :", gman.get_next_uniq_num())
   print("next_uniq_num (once more)   :", gman.get_next_uniq_num(), "(monotonic)")
   if hasattr(gman, "get_uuid"):
       strive:
           print("get_uuid                :", gman.get_uuid())
       besides Exception as e:
           skip(e)
   if hasattr(gman, "get_random_str"):
       strive:
           print("get_random_str(16)      :", gman.get_random_str(16))
       besides Exception as e:
           skip(e)
   print("singleton examine         :", generator.CGeneratorMan() is gman)
   strive:
       cyc = generator.CycleIDGenerator("127.0.0.1", 8080)
       i1, i2 = cyc.next_id(), cyc.next_id()
       print("CycleIDGenerator id #1  :", i1)
       print("CycleIDGenerator id #2  :", i2, "(incremented)")
       print("id #1 as hex            :", generator.CycleIDGenerator.id2_hexstring(i1))
   besides Exception as e:
       skip(e)
besides Exception as e:
   skip(e)
banner("6. THREAD POOL  (cup.providers.threadpool)")
strive:
   from cup.providers import threadpool
   pool = threadpool.ThreadPool(minthreads=2, maxthreads=4, title="demo-pool")
   pool.begin()
   outcomes, rlock = [], threading.Lock()
   def sq.(n):
       time.sleep(0.03)
       with rlock:
           outcomes.append(n * n)
       return n * n
   for i in vary(8):
       pool.add_1job(sq., i)
   callback_log = []
   def on_done(okay, outcome):
       callback_log.append((okay, outcome))
   pool.add_1job_with_callback(on_done, sq., 100)
   def will_fail():
       elevate RuntimeError("increase inside employee")
   pool.add_1job_with_callback(on_done, will_fail)
   time.sleep(0.5)
   print("dwell stats              :", pool.get_stats())
   pool.cease()
   print("squares collected       :", sorted(outcomes))
   print("callback outcomes        :", callback_log)
besides Exception as e:
   skip(e)

We use CUP’s in-memory cache to retailer key-value pairs with non permanent and everlasting lifetimes. We then generate distinctive names, counters, UUID-style values, random strings, and biking IDs for distributed-style identifiers. We additionally create a thread pool, submit jobs, accumulate outcomes, and observe callback conduct for each profitable and failed duties.

Threads, Scheduling, Time Utilities

banner("7. INTERRUPTIBLE THREADS + RW LOCK  (cup.thread)")
strive:
   from cup import thread as cupthread
   rw = cupthread.RWLock()
   rw.acquire_readlock()
   rw.acquire_readlock()
   print("acquired 2 learn locks concurrently")
   rw.release_readlock()
   rw.release_readlock()
   rw.acquire_writelock()
   print("acquired unique write lock")
   rw.release_writelock()
   stop_flag = {"working": True}
   def busy_loop():
       whereas stop_flag["running"]:
           time.sleep(0.05)
   t = cupthread.CupThread(goal=busy_loop)
   t.daemon = True
   t.begin()
   print("employee python tid       :", t.get_my_tid())
   stop_flag["running"] = False
   okay = t.terminate()
   print("CupThread.terminate ->  :", okay)
besides Exception as e:
   skip(e)
banner("8. DELAY / CRON EXECUTOR  (cup.providers.executor)")
strive:
   from cup.providers import executor
   URGENCY = getattr(executor, "URGENCY_NORMAL", 0)
   svc = executor.ExecutionService(
       delay_exe_thdnum=2, queue_exec_thdnum=2, title="exec-demo"
   )
   svc.begin()
   fired = []
   def job(tag):
       fired.append((tag, spherical(time.time(), 3)))
   svc.delay_exec(0.3, job, URGENCY, "delayed-0.3s")
   svc.queue_exec(job, URGENCY, "queued-now")
   time.sleep(0.8)
   svc.cease()
   print("fired (tag, ts)         :", fired)
   strive:
       import pytz
       import hashlib
       tz = pytz.timezone("Asia/Shanghai")
       timer = {"minute": [0, 30], "hour": None,
                "weekday": None, "monthday": None, "month": None}
       task_uuid = hashlib.md5(b"demo-cron").hexdigest()
       ctask = executor.CronTask("demo-cron", tz, timer, task_uuid, job, "cron-arg")
       print("cron next_schedtime     :", ctask.next_schedtime())
       print("cron 32-byte taskid     :", ctask.taskid())
   besides Exception as e:
       skip(e)
besides Exception as e:
   skip(e)
banner("9. TIME HELPERS  (cup.timeplus)")
strive:
   from cup import timeplus
   import pytz
   print("get_str_now (default)   :", timeplus.get_str_now())
   print("get_str_now (customized fmt):", timeplus.get_str_now("%Y/%m/%d %H:%M:%S"))
   tp = timeplus.TimePlus(pytz.timezone("Asia/Shanghai"))
   now_utc = datetime.datetime.utcnow()
   print("naive utc now           :", now_utc)
   print("-> Shanghai native       :", tp.utc2local(now_utc))
besides Exception as e:
   skip(e)

We discover CUP’s thread utilities by utilizing a reader-writer lock and an interruptible thread. We then run delayed and queued duties by way of the execution service to know easy scheduling conduct. We additionally create a cron-style job and examine its subsequent scheduled run time with out ready for the actual clock to tick.

System Resources and Storage

banner("10. SYSTEM RESOURCES  (cup.res.linux)")
strive:
   from cup.res import linux as reslinux
   strive:
       print("cpu cores               :", reslinux.get_cpu_nums())
   besides Exception as e:
       skip(e)
   strive:
       cpu = reslinux.get_cpu_usage(intvl_in_sec=1)
       print("cpu utilization pattern        :", cpu, "| usr=", getattr(cpu, "usr", "?"))
   besides Exception as e:
       skip(e)
   strive:
       mem = reslinux.get_meminfo()
       gb = 1024.0 ** 3
       print("mem whole / out there   : {:.2f} GB / {:.2f} GB".format(
           mem.whole / gb, mem.out there / gb))
   besides Exception as e:
       skip(e)
   strive:
       print("kernel model          :", reslinux.get_kernel_version())
   besides Exception as e:
       skip(e)
   strive:
       all_pids = reslinux.pids()
       print("dwell course of depend      :", len(all_pids))
   besides Exception as e:
       skip(e)
   strive:
       me = reslinux.Process(os.getpid())
       title = me.get_process_name() if hasattr(me, "get_process_name") else "?"
       standing = me.get_process_status() if hasattr(me, "get_process_status") else "?"
       print("this course of title/standing:", title, "/", standing)
   besides Exception as e:
       skip(e)
besides Exception as e:
   skip(e)
banner("11. FILE UTILITIES  (cup.exfile)")
strive:
   from cup import exfile
   lock_path = os.path.be a part of(tempfile.gettempdir(), "cup_demo.lock")
   flock = exfile.LockFile(lock_path)
   obtained = flock.lock(blocking=True)
   print("acquired file lock      :", obtained)
   print("lock file path          :", flock.filepath())
   flock.unlock()
   print("launched file lock      : True")
besides Exception as e:
   skip(e)
banner("12. NETWORKING HELPERS  (cup.web)")
strive:
   from cup import web as cupnet
   strive:
       print("native hostname          :", cupnet.get_local_hostname())
   besides Exception as e:
       skip(e)
   strive:
       print("host ip                 :", cupnet.get_hostip())
   besides Exception as e:
       skip(e)
   strive:
       print("native port 9999 free?   :", cupnet.localport_free(9999))
   besides Exception as e:
       skip(e)
besides Exception as e:
   skip(e)
banner("13. UNIFIED OBJECT STORAGE  (cup.storage.obj)")
strive:
   from cup.storage import obj as cupobj
   print("out there backends      :",
         [n for n in dir(cupobj) if n.endswith("ObjectSystem")])
   los = None
   for ctor in (lambda: cupobj.LocalObjectSystem(tempfile.gettempdir()),
                lambda: cupobj.LocalObjectSystem({}),
                lambda: cupobj.LocalObjectSystem()):
       strive:
           los = ctor()
           break
       besides Exception:
           proceed
   if los is None:
       print("   LocalObjectSystem ctor varies by model — see docs for "
             "your launch's precise signature.")
   else:
       print("LocalObjectSystem prepared :", kind(los).__name__,
             "(put/get/head/delete out there)")
besides Exception as e:
   skip(e)

We examine Linux system assets, together with CPU cores, CPU utilization, reminiscence, kernel model, course of depend, and present course of standing. We then use CUP’s file utility module to amass and launch an advisory lock for protected single-instance execution. We additionally check networking helpers and probe the item storage interface to see which storage backends can be found.

Type Maps and Assertions

banner("14. BIDIRECTIONAL TYPE MAPS  (cup.flag)")
strive:
   from cup import flag
   tman = flag.TypeMan()
   tman.register_types({"LOGIN": 1, "LOGOUT": 2, "HEARTBEAT": 3, "DATA": 4})
   print("LOGIN     -> quantity     :", tman.getnumber_bykey("LOGIN"))
   print("quantity 2  -> key        :", tman.getkey_bynumber(2))
   print("all registered keys     :", listing(tman.get_key_list()))
besides Exception as e:
   skip(e)
banner("15. BUILT-IN TEST ASSERTIONS  (cup.unittest)")
strive:
   from cup import unittest as cup_unittest
   cup_unittest.assert_eq(2 + 2, 4)
   cup_unittest.assert_ne(1, 2)
   cup_unittest.assert_true(isinstance([], listing))
   cup_unittest.assert_gt(5, 3)
   cup_unittest.assert_eq_one("b", ["a", "b", "c"])
   def raises_value_error():
       elevate ValueError("anticipated")
   cup_unittest.expect_raise(raises_value_error, ValueError)
   print("all passing assertions   : OK")
   strive:
       cup_unittest.assert_eq(1, 2, "intentional failure demo")
   besides Exception as e:
       print("caught supposed failure  :", kind(e).__name__)
besides Exception as e:
   skip(e)
banner("DONE — you exercised 15 CUP subsystems")
print("""
What you simply used, by module:
 cup.log .................. structured, rotating, parseable logging
 cup.decorators ........... Singleton, HintUsedTime, platform guards
 cup.util.conf ............ nested config with sections + @arrays (round-trip)
 cup.cache ................ TTL KV cache (get() extends lifetime)
 cup.providers.generator ... host-unique names, counters, biking 128-bit IDs
 cup.providers.threadpool .. pool with outcome/failure callbacks + dwell stats
 cup.thread ............... interruptible CupThread + reader/author RWLock
 cup.providers.executor .... delay_exec / queue_exec + crontab-style CronTask
 cup.timeplus ............. now-formatting + protected tz<->UTC conversion
 cup.res.linux ............ /proc cpu, reminiscence, kernel, course of introspection
 cup.exfile ............... advisory single-instance file locking
 cup.web .................. hostname / ip / free-port helpers
 cup.storage.obj .......... one API over Local / FTP / S3 / AFS backends
 cup.flag ................. bidirectional key<->quantity kind tables
 cup.unittest ............. assertion toolkit that performs properly with strive/besides
Next steps: API reference at https://cup.iobusy.com  •  supply at
https://github.com/baidu/CUP (browse src/cup/ and the cup_test/ suite).
""")

We use CUP’s flag utilities to create bidirectional mappings between message names and numeric codes. We then run CUP’s built-in assertion helpers to validate equality, inequality, fact checks, comparisons, membership, and anticipated exceptions. We end the tutorial by summarizing each CUP subsystem we train and connecting each to a sensible growth use case.

Conclusion

In conclusion, we accomplished a broad hands-on tour of CUP and perceive how its utilities can assist production-style Python purposes. We noticed how CUP helps us construction logs, handle runtime configuration, cache non permanent knowledge, generate distinctive identifiers, coordinate threads, examine system assets, shield information with locks, and validate conduct by way of assertions. It units us a compact however helpful basis for utilizing CUP in scripts, backend providers, automation jobs, and system-level Python initiatives the place reliability, observability, and sensible tooling matter.


Check out the FULL CODES here. Also, be at liberty to observe us on Twitter and don’t overlook to hitch our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us

The put up CUP (Common Useful Python): Building Reliable Python Workflows with Baidu’s Utility Toolkit appeared first on MarkTechPost.

Similar Posts