""" usage_logging.py ---------------- Purpose: This module implements privacy-preserving telemetry for the AI Recruiting Agent Hugging Face Space. Its sole purpose is to measure anonymous usage and adoption metrics in order to: - Understand how the tool is being used - Improve reliability and performance - Gauge sense of real-world adoption - Support research and evaluation of responsible AI practices Privacy Principles: This module is explicitly designed to minimize data collection and avoid storing any personally identifiable information (PII). It DOES NOT collect or store: - Raw IP addresses - User names or Hugging Face account IDs - Resume contents or job descriptions - Emails, phone numbers, or file names - Full user-agent strings or device fingerprints - Any demographic attributes about users It ONLY records: - Approximate country and city (derived from IP, not stored) - UTC timestamp of the event - Space URL - High-level event type (e.g., "app_open") - Non-identifying, aggregate metadata (e.g., counts, booleans, latencies) All usage logs are: - Anonymized - Append-only - Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage) - Versioned via immutable commit history for auditability Ethical Safeguards: - Logging failures never break application functionality - No raw identifiers are persisted at any time - All telemetry is optional and best-effort - The system is intended for transparency and improvement, not for surveillance or profiling Transparency: A public-facing usage reporting Space will be provided to allow independent verification of aggregate adoption metrics. Author: Arjun Singh Last Updated: 2026-01-27 """ import os import json from datetime import datetime import requests import gradio as gr from huggingface_hub import HfApi, list_repo_files, hf_hub_download import ipaddress import pycountry from io import BytesIO import uuid import time SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent" USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage" USAGE_EVENTS_DIR = "usage/events" LEGACY_JSONL_PATH = "usage/visits_legacy.jsonl" ROLLUP_PATH = "usage/visits.jsonl" def _hf_api(): token = os.environ.get("HF_TOKEN") if not token: return None return HfApi(token=token) def _is_public_ip(ip: str) -> bool: try: obj = ipaddress.ip_address(ip) return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local) except Exception: return False def _get_client_ip(request: gr.Request) -> str: if request: xff = request.headers.get("x-forwarded-for") if xff: for part in xff.split(","): ip = part.strip() if _is_public_ip(ip): return ip if request.client: host = request.client.host return host if _is_public_ip(host) else "" return "" def _country_lookup(ip: str) -> tuple[str, str]: token = os.environ.get("IPINFO_TOKEN") if not token: return ("", "") try: url = f"https://ipinfo.io/{ip}/json?token={token}" r = requests.get(url, timeout=4) if r.status_code != 200: return ("", "") data = r.json() # Some plans: country="US" # Some plans: country_code="US" and country="United States" cc = (data.get("country_code") or data.get("country") or "").strip().upper() name = (data.get("country") or "").strip() # If name is actually a code like "US", expand it if len(name) == 2 and name.upper() == cc: name = _expand_country_code(cc) # If name is missing but cc exists, expand if not name and cc: name = _expand_country_code(cc) return (cc, name) except Exception: return ("", "") def append_visit_to_dataset( country: str, city: str, session_id: str = "", event_type: str = "session_start", country_source: str = "unknown", country_code: str = "", **extra_fields ): api = _hf_api() if not api: return event = { "ts_utc": datetime.utcnow().isoformat() + "Z", "space_url": SPACE_URL, "session_id": session_id, "event": event_type, "country": country or "Unknown", "country_code": (country_code or "").strip().upper(), "country_source": country_source or "unknown", "city": city or "", } if extra_fields: # Prevent JSON nulls event.update({k: v for k, v in extra_fields.items() if v is not None}) # Unique file path per event (prevents collisions) ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S%f") uid = uuid.uuid4().hex[:8] path_in_repo = f"{USAGE_EVENTS_DIR}/{ts}_{uid}.json" try: api.upload_file( repo_id=USAGE_DATASET_REPO, repo_type="dataset", path_in_repo=path_in_repo, path_or_fileobj=BytesIO(json.dumps(event).encode("utf-8")), commit_message=f"log {event_type}", ) except Exception as e: print("telemetry upload failed:", repr(e)) def record_visit(request: gr.Request | None, session_id: str = "", event_type: str = "session_start"): if request is None: append_visit_to_dataset( country="NA", city="", session_id=session_id, event_type=event_type, country_source="click_event", country_code="", ) return # 1) Header hint country_hint = _country_from_headers(request) if _is_valid_country_code(country_hint): append_visit_to_dataset( country=_expand_country_code(country_hint), city="", session_id=session_id, event_type=event_type, country_source="header", country_code=country_hint.strip().upper(), ) return # 2) IP-based lookup ip = _get_client_ip(request) if ip: cc, name = _country_lookup(ip) if _is_valid_country_code(cc): append_visit_to_dataset( country=name or _expand_country_code(cc), city="", session_id=session_id, event_type=event_type, country_source="ipinfo", country_code=cc, ) else: append_visit_to_dataset( country="Unknown", city="", session_id=session_id, event_type=event_type, country_source="ipinfo_unknown", country_code="", ) return # 3) Nothing usable append_visit_to_dataset( country="Unknown", city="", session_id=session_id, event_type=event_type, country_source="none", country_code="", ) def _country_from_headers(request: gr.Request) -> str: if not request: return "" return ( request.headers.get("cf-ipcountry") or request.headers.get("x-country") or request.headers.get("x-geo-country") or "" ).strip().upper() def _is_valid_country_code(code: str) -> bool: if not code: return False code = code.strip().upper() # Common "unknown" markers from CDNs / proxies if code in {"XX", "ZZ", "UNKNOWN", "NA", "N/A", "NONE", "-"}: return False # ISO2 should be exactly 2 letters return len(code) == 2 and code.isalpha() def _expand_country_code(code: str) -> str: if not code or len(code) != 2: return "Unknown" try: country = pycountry.countries.get(alpha_2=code.upper()) return country.name if country else "Unknown" except Exception: return "Unknown" def _is_meaningful_country(val: str) -> bool: v = (val or "").strip().lower() if not v: return False if v in {"unknown", "na", "n/a", "none", "null", "undefined"}: return False return True def rebuild_visits_rollup_from_event_files() -> str: """ Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/. ALSO writes an enriched rollup usage/visits_enriched.jsonl where: - legacy rows (no session_id) keep their original country as final_country - new click rows (with session_id) get final_country from the session's session_start row when available """ api = _hf_api() if not api: return "HF_TOKEN not available. Rollup requires write access." ENRICHED_ROLLUP_PATH = "usage/visits_enriched.jsonl" # 1) List files try: files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset") except Exception as e: return f"Could not list repo files: {repr(e)}" event_files = [ f for f in files if f.startswith(f"{USAGE_EVENTS_DIR}/") and f.endswith(".json") ] if not event_files: return f"No event files found under {USAGE_EVENTS_DIR}/" events = [] bad = 0 # 2) Download & parse each event for path in event_files: try: local_path = hf_hub_download( repo_id=USAGE_DATASET_REPO, repo_type="dataset", filename=path, ) with open(local_path, "r", encoding="utf-8") as f: events.append(json.load(f)) except Exception: bad += 1 if not events: return f"Found {len(event_files)} event files, but 0 were parseable (bad={bad})." # 3) Sort by ts_utc events.sort(key=lambda e: (e.get("ts_utc") or "")) # 4) Build session → geo map from session_start events (new schema) session_geo = {} for e in events: if e.get("event") == "session_start": sid = (e.get("session_id") or "").strip() if not sid: continue # legacy usage_start rows won’t have session_id; ignore for mapping if sid not in session_geo: session_geo[sid] = { "country_session": e.get("country") or "Unknown", "country_code_session": (e.get("country_code") or "").strip().upper(), "country_source_session": e.get("country_source") or "unknown", } # 5) Write RAW JSONL (same behavior as today) buf_raw = BytesIO() for evt in events: buf_raw.write((json.dumps(evt, ensure_ascii=False) + "\n").encode("utf-8")) buf_raw.seek(0) # 6) Write ENRICHED JSONL # Rules: # - Legacy rows (no session_id): final_country = evt.country (already correct) # - New rows with session_id: # - if evt.country is Unknown/blank, use country from session usage_start # - keep original evt.country as well (don’t overwrite) buf_enriched = BytesIO() for evt in events: enriched = dict(evt) sid = (evt.get("session_id") or "").strip() geo = session_geo.get(sid, {}) if sid else {} # Keep a copy of session geo (useful for debugging/audit) if geo: enriched.update(geo) evt_country = (evt.get("country") or "").strip() evt_cc = (evt.get("country_code") or "").strip().upper() # Determine final_country / final_country_code if sid: if _is_meaningful_country(evt_country): enriched["final_country"] = evt_country enriched["final_country_code"] = evt_cc enriched["final_country_source"] = evt.get("country_source") or "unknown" else: enriched["final_country"] = geo.get("country_session", "Unknown") enriched["final_country_code"] = geo.get("country_code_session", "") enriched["final_country_source"] = geo.get("country_source_session", "unknown") else: # Legacy row: preserve the original country fields enriched["final_country"] = evt_country or "Unknown" enriched["final_country_code"] = evt_cc enriched["final_country_source"] = evt.get("country_source") or "unknown" buf_enriched.write((json.dumps(enriched, ensure_ascii=False) + "\n").encode("utf-8")) buf_enriched.seek(0) # 7) Upload both rollups try: api.upload_file( repo_id=USAGE_DATASET_REPO, repo_type="dataset", path_in_repo=ROLLUP_PATH, # your existing usage/visits.jsonl path_or_fileobj=buf_raw, commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}", ) api.upload_file( repo_id=USAGE_DATASET_REPO, repo_type="dataset", path_in_repo=ENRICHED_ROLLUP_PATH, path_or_fileobj=buf_enriched, commit_message=f"rebuild {ENRICHED_ROLLUP_PATH} from {USAGE_EVENTS_DIR}", ) except Exception as e: return f"Rollup upload failed: {repr(e)}" return ( f"Rollups rebuilt:\n" f"- RAW: {ROLLUP_PATH} rows={len(events)} (bad_files={bad})\n" f"- ENRICHED: {ENRICHED_ROLLUP_PATH} rows={len(events)} (bad_files={bad})" )