""" ALL_PREDICT MODULE End-to-end inference pipeline: JSON input -> preprocess -> model -> prediction Created: January 2026 """ from __future__ import annotations import json import joblib import numpy as np import pandas as pd from typing import Dict, Any, List # ========================================================= # GLOBAL CONFIG (TRAINING CONTRACT) # ========================================================= # Các cột KHÔNG BAO GIỜ đưa vào model DEFAULT_DROP_COLS = { "ma_dia_diem", "all_task_normal", "all_task_dinhky", "is_tasks_text_missing", "so_luong", # target nếu lỡ có } TASK_NORMAL_COL = "all_task_normal" TASK_DINHKY_COL = "all_task_dinhky" # ========================================================= # IMPORT KEYWORD FEATURE EXTRACTOR # ========================================================= from predict import extract_keyword_features_reduced_from_two_texts # ========================================================= # 1) KEYWORD FEATURES # ========================================================= def add_keyword_features_to_df(df: pd.DataFrame) -> pd.DataFrame: """ Add keyword-based features from all_task_normal + all_task_dinhky """ if TASK_NORMAL_COL not in df.columns: df[TASK_NORMAL_COL] = None if TASK_DINHKY_COL not in df.columns: df[TASK_DINHKY_COL] = None feats = df.apply( lambda r: extract_keyword_features_reduced_from_two_texts( r.get(TASK_NORMAL_COL), r.get(TASK_DINHKY_COL), ), axis=1, ) feats_df = pd.DataFrame(list(feats)) return pd.concat([df.reset_index(drop=True), feats_df.reset_index(drop=True)], axis=1) # ========================================================= # 2) PREPROCESSING (MATCH TRAINING) # ========================================================= def time_to_hour(x) -> float: if pd.isna(x): return np.nan if hasattr(x, "hour"): return float(x.hour) + float(getattr(x, "minute", 0)) / 60.0 s = str(x).strip() if " " in s and ":" in s: s = s.split(" ", 1)[1].strip() if ":" in s: try: h, m = s.split(":")[0:2] return float(h) + float(m) / 60.0 except Exception: return np.nan try: return float(s) except Exception: return np.nan def build_X_from_raw_df(df_raw: pd.DataFrame) -> pd.DataFrame: """ Feature selection: tự động drop các cột không dùng cho model """ df = df_raw.copy() drop_cols = [c for c in df.columns if c in DEFAULT_DROP_COLS] if drop_cols: df = df.drop(columns=drop_cols) return df def preprocess_like_training(X: pd.DataFrame) -> pd.DataFrame: """ Same logic as training CELL 3 """ X_proc = X.copy() if "bat_dau" in X_proc.columns: X_proc["hour_start"] = X_proc["bat_dau"].apply(time_to_hour) if "ket_thuc" in X_proc.columns: X_proc["hour_end"] = X_proc["ket_thuc"].apply(time_to_hour) if "hour_start" in X_proc.columns and "hour_end" in X_proc.columns: end_adj = X_proc["hour_end"].copy() cross = ( X_proc["hour_start"].notna() & X_proc["hour_end"].notna() & (X_proc["hour_end"] < X_proc["hour_start"]) ) end_adj.loc[cross] += 24 X_proc["shift_length"] = (end_adj - X_proc["hour_start"]).clip(lower=0) X_proc["is_cross_day"] = cross.astype(int) for c in ["bat_dau", "ket_thuc"]: if c in X_proc.columns: X_proc = X_proc.drop(columns=[c]) cat_cols = [c for c in X_proc.columns if X_proc[c].dtype == "object"] if cat_cols: X_proc = pd.get_dummies(X_proc, columns=cat_cols, dummy_na=True) X_proc = X_proc.replace([np.inf, -np.inf], np.nan).fillna(0) return X_proc # ========================================================= # 3) ALIGN TO TRAINING SCHEMA # ========================================================= def load_schema_columns(columns_joblib_path: str) -> List[str]: cols = joblib.load(columns_joblib_path) if not isinstance(cols, list): raise ValueError("Invalid schema columns file") return cols def align_to_schema(X_proc: pd.DataFrame, schema_columns: List[str]) -> pd.DataFrame: X = X_proc.copy() for c in schema_columns: if c not in X.columns: X[c] = 0 extra_cols = [c for c in X.columns if c not in schema_columns] if extra_cols: X = X.drop(columns=extra_cols) X = X[schema_columns] X = X.apply(pd.to_numeric, errors="coerce").fillna(0) return X # ========================================================= # 4) FULL PIPELINE: RECORD -> MODEL INPUT # ========================================================= def record_to_model_input_df( record: Dict[str, Any], schema_columns: List[str], ) -> pd.DataFrame: df_raw = pd.DataFrame([record]) df_kw = add_keyword_features_to_df(df_raw) X_raw = build_X_from_raw_df(df_kw) X_proc = preprocess_like_training(X_raw) X_aligned = align_to_schema(X_proc, schema_columns) return X_aligned # ========================================================= # 5) PREDICT # ========================================================= def predict_from_record( record: Dict[str, Any], model_joblib_path: str, columns_joblib_path: str, ) -> Dict[str, Any]: schema_columns = load_schema_columns(columns_joblib_path) X = record_to_model_input_df(record, schema_columns) model = joblib.load(model_joblib_path) pred_log = model.predict(X.values)[0] pred_raw = float(np.maximum(0.0, np.expm1(pred_log))) return { "so_luong_du_doan_raw": round(pred_raw, 2), "so_luong_du_doan_round": int(np.rint(pred_raw)), } def predict_from_json_file( json_path: str, model_joblib_path: str, columns_joblib_path: str, ) -> str: with open(json_path, "r", encoding="utf-8") as f: record = json.load(f) result = predict_from_record( record, model_joblib_path, columns_joblib_path, ) return json.dumps(result, ensure_ascii=False, indent=2) # ========================================================= # 6) MAIN # ========================================================= if __name__ == "__main__": MODEL_JOBLIB = "./artifacts/extratrees_staff_model.joblib" COLUMNS_JOBLIB = "./artifacts/X_proc_columns.joblib" INPUT_JSON = "./input.json" print( predict_from_json_file( json_path=INPUT_JSON, model_joblib_path=MODEL_JOBLIB, columns_joblib_path=COLUMNS_JOBLIB, ) )