HoanMy_DetectTansuat/demo.py

376 lines
12 KiB
Python

import pandas as pd
import re
import unicodedata
from pyvi import ViTokenizer
import pandas as pd
import numpy as np
import re
import unicodedata
from pyvi import ViTokenizer
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.metrics import classification_report
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load HF model lần đầu, dùng chung
hf_tokenizer = AutoTokenizer.from_pretrained("dangvantuan/vietnamese-embedding")
hf_model = AutoModel.from_pretrained("dangvantuan/vietnamese-embedding").to(device)
def embed_text(text, tokenizer=hf_tokenizer, model=hf_model, device=device):
"""
Embed 1 câu text thành vector bằng model dangvantuan/vietnamese-embedding.
Dùng mean pooling với attention_mask.
"""
encoded = tokenizer(
text,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
).to(device)
with torch.no_grad():
output = model(**encoded)
token_embeddings = output.last_hidden_state # (1, L, H)
attention_mask = encoded["attention_mask"] # (1, L)
mask = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
sum_embeds = (token_embeddings * mask).sum(dim=1) # (1, H)
lengths = mask.sum(dim=1) # (1,1)
mean_pooled = sum_embeds / lengths
return mean_pooled.cpu() # (1, H)
def cosine_sim(a, b):
"""
Tính cosine similarity:
- a: tensor shape (1, H)
- b: tensor shape (N, H)
Trả về: (1, N)
"""
a = a / a.norm(dim=-1, keepdim=True)
b = b / b.norm(dim=-1, keepdim=True)
return torch.mm(a, b.t())
# ============================================================
# 2. RULE VỀ LOCATION VÀ THÔNG SỐ
# ============================================================
def location_similarity(q_row, cand_row):
"""
Độ giống Location dựa trên location_clean (Jaccard tokens).
"""
q_tokens = set(str(q_row["location_clean"]).split())
c_tokens = set(str(cand_row["location_clean"]).split())
if not q_tokens or not c_tokens:
return 0.0
inter = len(q_tokens & c_tokens)
union = len(q_tokens | c_tokens)
return inter / union
def numeric_closeness(q_row, cand_row, alpha_out=0.7, alpha_in=0.7):
"""
Độ gần nhau về:
- luuluong
- diện tích ngoài / trong (log1p + exp(-alpha * dist))
Trả về giá trị khoảng (0..~1.5), càng lớn càng gần.
"""
# closeness theo luuluong
if "luuluong" in q_row and "luuluong" in cand_row:
if q_row["luuluong"] == cand_row["luuluong"]:
c_luu = 1.0
elif abs(q_row["luuluong"] - cand_row["luuluong"]) == 1:
c_luu = 0.6
else:
c_luu = 0.3
else:
c_luu = 0.5
# closeness theo diện tích, xử lý thiếu an toàn
def safe_val(row, col):
return float(row[col]) if col in row and not pd.isna(row[col]) else 0.0
q_out = safe_val(q_row, "Dientichngoai")
q_in = safe_val(q_row, "Dientichtrong")
c_out = safe_val(cand_row, "Dientichngoai")
c_in = safe_val(cand_row, "Dientichtrong")
d_out = abs(np.log1p(q_out) - np.log1p(c_out))
d_in = abs(np.log1p(q_in) - np.log1p(c_in))
c_out = np.exp(-alpha_out * d_out)
c_in = np.exp(-alpha_in * d_in)
return 0.5 * c_luu + 0.25 * c_out + 0.25 * c_in
# ============================================================
# 3. TRAIN EMBEDDINGS TỪ train_df
# ============================================================
def build_train_embeddings(train_df):
"""
Nhận train_df có cột 'job_segmented'.
Trả về tensor embeddings shape (N_train, H).
"""
train_texts = train_df["job_segmented"].tolist()
embs = []
for txt in train_texts:
vec = embed_text(txt)
embs.append(vec.squeeze(0))
return torch.stack(embs)
# ============================================================
# 4. RULE DỰ ĐOÁN CHO 1 ROW
# ============================================================
def predict_label_for_row(q_row,
train_df,
train_embeddings,
tokenizer=hf_tokenizer,
model=hf_model,
top_k=10,
w_numeric=0.4,
w_loc=0.6):
"""
Bước 1: dùng embedding(job_segmented) để chọn top_k ứng viên gần nghĩa nhất.
Bước 2: trong top_k đó, KHÔNG dùng score text nữa, chỉ dùng:
- loc_sim : similarity theo location_clean [0,1] (quan trọng nhất)
- num_c : numeric_closeness (luuluong + diện tích) (~0..1.5)
final_score = w_loc * loc_sim + w_numeric * num_c
"""
# 1) embed query
q_vec = embed_text(q_row["job_segmented"], tokenizer, model) # (1,H)
# 2) cosine similarity với toàn bộ train → chỉ để CHỌN ỨNG VIÊN
sims = cosine_sim(q_vec, train_embeddings)[0] # (N,)
# 3) lấy top-k job gần nhất theo embedding
top_k = min(top_k, len(train_df))
top_scores, top_idx = torch.topk(sims, k=top_k)
label_scores = {}
for _, idx in zip(top_scores, top_idx):
cand_row = train_df.iloc[int(idx)]
loc_sim = location_similarity(q_row, cand_row)
num_c = numeric_closeness(q_row, cand_row)
final_score = w_loc * loc_sim + w_numeric * num_c
lbl = str(cand_row["label"])
label_scores[lbl] = label_scores.get(lbl, 0.0) + final_score
# fallback nếu không có ứng viên
if not label_scores:
majority_label = str(train_df["label"].value_counts().idxmax())
return majority_label, {}
best_label = max(label_scores, key=label_scores.get)
return best_label, label_scores
# ============================================================
# 5. PREDICT CHO CẢ 1 DATAFRAME (DÙNG CHO TEST/HOLDOUT)
# ============================================================
def predict_on_df(df_in: pd.DataFrame,
train_df: pd.DataFrame,
train_embeddings,
name: str,
top_k=5,
w_numeric=0.4,
w_loc=0.6):
df = df_in.copy()
preds = []
scores = []
for _, row in df.iterrows():
pred, sc = predict_label_for_row(
row,
train_df=train_df,
train_embeddings=train_embeddings,
tokenizer=hf_tokenizer,
model=hf_model,
top_k=top_k,
w_numeric=w_numeric,
w_loc=w_loc
)
preds.append(pred)
scores.append(sc)
df["pred_label"] = preds
df["score_details"] = scores
print(f"\n========== KẾT QUẢ TRÊN {name} ==========")
print(classification_report(
df["label"].astype(str),
df["pred_label"].astype(str),
digits=3
))
return df
def normalize_text_keep_words(s: str) -> str:
s = str(s)
s = unicodedata.normalize('NFC', s).lower()
s = re.sub(r"[^0-9a-zà-ỹ\s]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def segment_and_remove_stopwords(text):
if not isinstance(text, str):
return ""
segmented = ViTokenizer.tokenize(text)
tokens = segmented.split()
filtered = [tok for tok in tokens if tok not in vietnamese_stopwords]
return " ".join(filtered)
def preprocess_pipeline(df_raw: pd.DataFrame) -> pd.DataFrame:
"""
- Không ghép Location + Job
- Clean text
- Word segment Job + remove stopword
"""
df = df_raw.dropna(subset=["label"]).copy()
df["label"] = df["label"].astype(str).str.strip()
df = df[df["label"].isin(["1","2","3","4"])].copy()
df["Location"] = df["Location"].fillna("Unknown").astype(str)
df["Job"] = df["Job"].astype(str)
# Clean text
df["location_clean"] = df["Location"].apply(normalize_text_keep_words)
df["job_clean"] = df["Job"].apply(normalize_text_keep_words)
# Word segment + remove stopword
df["job_segmented"] = df["job_clean"].apply(segment_and_remove_stopwords)
# Giữ lại các cột numeric nếu có
keep_cols = [
"Location","Job","label",
"location_clean","job_clean","job_segmented"
]
for c in ["Name","luuluong","Dientichngoai","Dientichtrong"]:
if c in df.columns:
keep_cols.append(c)
return df[keep_cols].reset_index(drop=True)
# ==== 1. Load stopword ====
def predict_single_sample(job_segmented: str,
location_clean: str,
name: str,
luuluong: int,
dientichngoai: float,
dientichtrong: float,
train_df: pd.DataFrame,
train_embeddings,
top_k=5,
w_numeric=0.4,
w_loc=0.6):
"""
Demo dự đoán cho 1 điểm dữ liệu mới.
- job_segmented: chuỗi đã wordsegment + bỏ stopword (giống train_df['job_segmented'])
- location_clean: chuỗi location đã normalize (giống train_df['location_clean'])
- name, luuluong, dientichngoai, dientichtrong: thông số tòa/job
"""
# Tạo 1 Series giả giống 1 dòng trong df_out
q_dict = {
"job_segmented": job_segmented,
"location_clean": location_clean,
"Name": name,
"luuluong": luuluong,
"Dientichngoai": dientichngoai,
"Dientichtrong": dientichtrong,
}
q_row = pd.Series(q_dict)
pred_label, label_scores = predict_label_for_row(
q_row,
train_df=train_df,
train_embeddings=train_embeddings,
tokenizer=hf_tokenizer,
model=hf_model,
top_k=top_k,
w_numeric=w_numeric,
w_loc=w_loc
)
return pred_label, label_scores
def load_stopwords(path):
with open(path, "r", encoding="utf-8") as f:
sw = [line.strip() for line in f if line.strip()]
return set(sw)
stopwords_path = "vietnamese-stopwords-dash.txt" # đổi tên nếu khác
vietnamese_stopwords = load_stopwords(stopwords_path)
def chia_train_test(df_out: pd.DataFrame):
df_out = df_out[df_out["label"].astype(str).isin(["1", "2", "3"])].reset_index(drop=True)
test_buildings = ["CMC"] # tập test chính
test_df = df_out[df_out["Name"].isin(test_buildings)].reset_index(drop=True)
train_df = df_out[
~df_out["Name"].isin(test_buildings)
].reset_index(drop=True)
return train_df, test_df
if __name__ == "__main__":
df_raw = pd.read_excel("hoanmy_detect_task.xlsx")
df = preprocess_pipeline(df_raw)
train_df, test_df = chia_train_test(df)
train_embeddings = build_train_embeddings(train_df)
# 4) Demo: dự đoán cho 1 dòng trong test_df (ví dụ dòng đầu tiên)
example_row = test_df.iloc[0]
pred, scores = predict_label_for_row(
example_row,
train_df=train_df,
train_embeddings=train_embeddings,
top_k=5,
w_numeric=0.4,
w_loc=0.6
)
print("\n=== DEMO PREDICT 1 DÒNG ===")
print("Job :", example_row["Job"])
print("Segm job :", example_row["job_segmented"])
print("Location :", example_row["Location"])
print("True label:", example_row["label"])
print("Pred label:", pred)
print("Label scores:", scores)
# 5) Hoặc demo với 1 input tự tạo
# job_seg_demo = "quét_dọn hành_lang lau_chùi vệ_sinh"
# loc_clean_demo = "hành lang tầng 3"
# pred2, scores2 = predict_single_sample(
# job_segmented=job_seg_demo,
# location_clean=loc_clean_demo,
# name="CMC",
# luuluong=2,
# dientichngoai=0,
# dientichtrong=1500,
# train_df=train_df,
# train_embeddings=train_embeddings,
# top_k=5,
# w_numeric=0.4,
# w_loc=0.6
# )
# print("\n=== DEMO PREDICT 1 INPUT TỰ TẠO ===")
# print("Pred label:", pred2)
# print("Label scores:", scores2)