501 lines
15 KiB
Python
501 lines
15 KiB
Python
"""
|
|
Text Feature Extraction Pipeline for Staff Prediction Model
|
|
=============================================================
|
|
|
|
This script extracts TF-IDF and SVD features from Vietnamese task descriptions.
|
|
Can be used for both training (fit_transform) and inference (transform).
|
|
|
|
Usage:
|
|
------
|
|
Training mode:
|
|
python extract_text_features.py --mode train --input data.xlsx --output features.csv
|
|
|
|
Inference mode:
|
|
python extract_text_features.py --mode predict --input new_data.xlsx --output predictions.csv
|
|
|
|
As a module:
|
|
from extract_text_features import TextFeatureExtractor
|
|
extractor = TextFeatureExtractor()
|
|
features = extractor.fit_transform(texts)
|
|
|
|
Author: ML Team
|
|
Date: 2026-01-06
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import re
|
|
import pickle
|
|
import argparse
|
|
import os
|
|
from typing import List, Tuple, Optional, Union
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.decomposition import TruncatedSVD
|
|
|
|
|
|
class TextFeatureExtractor:
|
|
"""
|
|
Extract TF-IDF and SVD features from Vietnamese text.
|
|
|
|
Attributes:
|
|
max_features (int): Maximum number of TF-IDF features
|
|
n_components (int): Number of SVD components
|
|
ngram_range (tuple): N-gram range for TF-IDF
|
|
min_df (int): Minimum document frequency
|
|
max_df (float): Maximum document frequency
|
|
tfidf (TfidfVectorizer): Fitted TF-IDF vectorizer
|
|
svd (TruncatedSVD): Fitted SVD model
|
|
is_fitted (bool): Whether the extractor has been fitted
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_features: int = 200,
|
|
n_components: int = 50,
|
|
ngram_range: Tuple[int, int] = (1, 2),
|
|
min_df: int = 2,
|
|
max_df: float = 0.95,
|
|
random_state: int = 42
|
|
):
|
|
"""
|
|
Initialize the TextFeatureExtractor.
|
|
|
|
Args:
|
|
max_features: Maximum number of TF-IDF features (default: 200)
|
|
n_components: Number of SVD components (default: 50)
|
|
ngram_range: N-gram range for TF-IDF (default: (1, 2))
|
|
min_df: Minimum document frequency (default: 2)
|
|
max_df: Maximum document frequency (default: 0.95)
|
|
random_state: Random seed for reproducibility (default: 42)
|
|
"""
|
|
self.max_features = max_features
|
|
self.n_components = n_components
|
|
self.ngram_range = ngram_range
|
|
self.min_df = min_df
|
|
self.max_df = max_df
|
|
self.random_state = random_state
|
|
|
|
# Initialize models
|
|
self.tfidf = TfidfVectorizer(
|
|
max_features=max_features,
|
|
ngram_range=ngram_range,
|
|
min_df=min_df,
|
|
max_df=max_df,
|
|
sublinear_tf=True,
|
|
strip_accents=None
|
|
)
|
|
|
|
self.svd = TruncatedSVD(
|
|
n_components=n_components,
|
|
random_state=random_state
|
|
)
|
|
|
|
self.is_fitted = False
|
|
|
|
@staticmethod
|
|
def preprocess_text(text: Union[str, float, None]) -> str:
|
|
"""
|
|
Preprocess Vietnamese text.
|
|
|
|
Args:
|
|
text: Input text (can be str, float, or None)
|
|
|
|
Returns:
|
|
Cleaned text string
|
|
"""
|
|
if pd.isna(text) or text is None or str(text).strip() == '':
|
|
return ''
|
|
|
|
text = str(text).lower()
|
|
|
|
# Keep Vietnamese characters, numbers, spaces
|
|
text = re.sub(
|
|
r'[^a-zàáạảãâầấậẩẫăằắặẳẵèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹđ0-9\s]',
|
|
' ',
|
|
text
|
|
)
|
|
|
|
# Remove multiple spaces
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
return text
|
|
|
|
def preprocess_texts(self, texts: List[str]) -> List[str]:
|
|
"""
|
|
Preprocess a list of texts.
|
|
|
|
Args:
|
|
texts: List of text strings
|
|
|
|
Returns:
|
|
List of cleaned text strings
|
|
"""
|
|
return [self.preprocess_text(text) for text in texts]
|
|
|
|
def combine_task_columns(
|
|
self,
|
|
task_normal: pd.Series,
|
|
task_dinhky: pd.Series
|
|
) -> List[str]:
|
|
"""
|
|
Combine two task columns into one.
|
|
|
|
Args:
|
|
task_normal: Series of normal task descriptions
|
|
task_dinhky: Series of scheduled task descriptions
|
|
|
|
Returns:
|
|
List of combined and cleaned texts
|
|
"""
|
|
# Preprocess both columns
|
|
normal_clean = task_normal.apply(self.preprocess_text)
|
|
dinhky_clean = task_dinhky.apply(self.preprocess_text)
|
|
|
|
# Combine
|
|
combined = (normal_clean + ' ' + dinhky_clean).str.strip()
|
|
|
|
return combined.tolist()
|
|
|
|
def fit(self, texts: List[str]) -> 'TextFeatureExtractor':
|
|
"""
|
|
Fit the TF-IDF and SVD models on training texts.
|
|
|
|
Args:
|
|
texts: List of text strings
|
|
|
|
Returns:
|
|
self
|
|
"""
|
|
print(f"Fitting TF-IDF on {len(texts)} documents...")
|
|
|
|
# Preprocess
|
|
texts_clean = self.preprocess_texts(texts)
|
|
|
|
# Fit TF-IDF
|
|
tfidf_matrix = self.tfidf.fit_transform(texts_clean)
|
|
|
|
print(f" TF-IDF shape: {tfidf_matrix.shape}")
|
|
print(f" Sparsity: {(1.0 - tfidf_matrix.nnz / (tfidf_matrix.shape[0] * tfidf_matrix.shape[1])) * 100:.2f}%")
|
|
|
|
# Fit SVD
|
|
print(f"\nFitting SVD ({self.n_components} components)...")
|
|
self.svd.fit(tfidf_matrix)
|
|
|
|
print(f" Explained variance: {self.svd.explained_variance_ratio_.sum()*100:.2f}%")
|
|
|
|
self.is_fitted = True
|
|
print("\n✅ Fitting complete!")
|
|
|
|
return self
|
|
|
|
def transform(self, texts: List[str]) -> np.ndarray:
|
|
"""
|
|
Transform texts to SVD features.
|
|
|
|
Args:
|
|
texts: List of text strings
|
|
|
|
Returns:
|
|
Array of SVD features (n_samples, n_components)
|
|
"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Extractor must be fitted before transform. Call fit() first.")
|
|
|
|
# Preprocess
|
|
texts_clean = self.preprocess_texts(texts)
|
|
|
|
# TF-IDF transform
|
|
tfidf_matrix = self.tfidf.transform(texts_clean)
|
|
|
|
# SVD transform
|
|
svd_features = self.svd.transform(tfidf_matrix)
|
|
|
|
return svd_features
|
|
|
|
def fit_transform(self, texts: List[str]) -> np.ndarray:
|
|
"""
|
|
Fit and transform in one step.
|
|
|
|
Args:
|
|
texts: List of text strings
|
|
|
|
Returns:
|
|
Array of SVD features (n_samples, n_components)
|
|
"""
|
|
self.fit(texts)
|
|
return self.transform(texts)
|
|
|
|
def get_feature_names(self) -> List[str]:
|
|
"""
|
|
Get feature names for the SVD components.
|
|
|
|
Returns:
|
|
List of feature names
|
|
"""
|
|
return [f'text_svd_{i+1}' for i in range(self.n_components)]
|
|
|
|
def get_top_tfidf_features(self, top_n: int = 20) -> pd.DataFrame:
|
|
"""
|
|
Get top TF-IDF features by document frequency.
|
|
|
|
Args:
|
|
top_n: Number of top features to return
|
|
|
|
Returns:
|
|
DataFrame with feature names and document frequencies
|
|
"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Extractor must be fitted first.")
|
|
|
|
feature_names = self.tfidf.get_feature_names_out()
|
|
doc_freq = np.asarray(self.tfidf.transform(self.tfidf.get_feature_names_out()).sum(axis=0)).ravel()
|
|
|
|
top_features = pd.DataFrame({
|
|
'feature': feature_names,
|
|
'doc_frequency': doc_freq
|
|
}).sort_values('doc_frequency', ascending=False).head(top_n)
|
|
|
|
return top_features
|
|
|
|
def save(self, filepath: str):
|
|
"""
|
|
Save the fitted extractor to disk.
|
|
|
|
Args:
|
|
filepath: Path to save the extractor (should end with .pkl)
|
|
"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Extractor must be fitted before saving.")
|
|
|
|
with open(filepath, 'wb') as f:
|
|
pickle.dump(self, f)
|
|
|
|
print(f"✅ Saved extractor to: {filepath}")
|
|
|
|
@staticmethod
|
|
def load(filepath: str) -> 'TextFeatureExtractor':
|
|
"""
|
|
Load a fitted extractor from disk.
|
|
|
|
Args:
|
|
filepath: Path to the saved extractor
|
|
|
|
Returns:
|
|
Loaded TextFeatureExtractor
|
|
"""
|
|
with open(filepath, 'rb') as f:
|
|
extractor = pickle.load(f)
|
|
|
|
print(f"✅ Loaded extractor from: {filepath}")
|
|
return extractor
|
|
|
|
def get_summary(self) -> dict:
|
|
"""
|
|
Get summary statistics of the extractor.
|
|
|
|
Returns:
|
|
Dictionary with summary information
|
|
"""
|
|
if not self.is_fitted:
|
|
return {'status': 'not_fitted'}
|
|
|
|
return {
|
|
'status': 'fitted',
|
|
'max_features': self.max_features,
|
|
'n_components': self.n_components,
|
|
'ngram_range': self.ngram_range,
|
|
'min_df': self.min_df,
|
|
'max_df': self.max_df,
|
|
'actual_tfidf_features': len(self.tfidf.get_feature_names_out()),
|
|
'explained_variance': float(self.svd.explained_variance_ratio_.sum()),
|
|
'random_state': self.random_state
|
|
}
|
|
|
|
|
|
def extract_features_from_dataframe(
|
|
df: pd.DataFrame,
|
|
text_columns: List[str] = ['all_task_normal', 'all_task_dinhky'],
|
|
extractor: Optional[TextFeatureExtractor] = None,
|
|
fit: bool = True
|
|
) -> Tuple[pd.DataFrame, TextFeatureExtractor]:
|
|
"""
|
|
Extract text features from a DataFrame.
|
|
|
|
Args:
|
|
df: Input DataFrame
|
|
text_columns: List of text column names to combine
|
|
extractor: Pre-fitted extractor (optional, for inference)
|
|
fit: Whether to fit the extractor (True for training, False for inference)
|
|
|
|
Returns:
|
|
Tuple of (features_df, extractor)
|
|
"""
|
|
print("=" * 80)
|
|
print("TEXT FEATURE EXTRACTION")
|
|
print("=" * 80)
|
|
|
|
# Combine text columns
|
|
if len(text_columns) == 1:
|
|
texts = df[text_columns[0]].tolist()
|
|
else:
|
|
print(f"\nCombining {len(text_columns)} text columns...")
|
|
texts = []
|
|
for _, row in df.iterrows():
|
|
combined = ' '.join([str(row[col]) if pd.notna(row[col]) else '' for col in text_columns])
|
|
texts.append(combined)
|
|
|
|
print(f"Total documents: {len(texts)}")
|
|
|
|
# Initialize or use existing extractor
|
|
if extractor is None:
|
|
print("\nInitializing new TextFeatureExtractor...")
|
|
extractor = TextFeatureExtractor()
|
|
|
|
# Extract features
|
|
if fit:
|
|
print("\nMode: TRAINING (fit_transform)")
|
|
features = extractor.fit_transform(texts)
|
|
else:
|
|
print("\nMode: INFERENCE (transform)")
|
|
features = extractor.transform(texts)
|
|
|
|
# Create DataFrame
|
|
feature_names = extractor.get_feature_names()
|
|
features_df = pd.DataFrame(features, columns=feature_names)
|
|
|
|
print(f"\n✅ Extraction complete!")
|
|
print(f" Output shape: {features_df.shape}")
|
|
print(f" Feature names: {feature_names[:5]}... (showing first 5)")
|
|
|
|
# Summary
|
|
summary = extractor.get_summary()
|
|
print(f"\n📊 Extractor Summary:")
|
|
for key, value in summary.items():
|
|
print(f" {key}: {value}")
|
|
|
|
return features_df, extractor
|
|
|
|
|
|
def main():
|
|
"""Command-line interface for text feature extraction."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Extract TF-IDF and SVD features from Vietnamese task descriptions'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--mode',
|
|
type=str,
|
|
choices=['train', 'predict'],
|
|
required=True,
|
|
help='Mode: train (fit and save) or predict (load and transform)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--input',
|
|
type=str,
|
|
required=True,
|
|
help='Input file path (Excel or CSV)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--output',
|
|
type=str,
|
|
required=True,
|
|
help='Output file path for features (CSV)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--text-columns',
|
|
type=str,
|
|
nargs='+',
|
|
default=['all_task_normal', 'all_task_dinhky'],
|
|
help='Text column names to combine (default: all_task_normal all_task_dinhky)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--extractor-path',
|
|
type=str,
|
|
default='text_feature_extractor.pkl',
|
|
help='Path to save/load the extractor (default: text_feature_extractor.pkl)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--max-features',
|
|
type=int,
|
|
default=200,
|
|
help='Maximum TF-IDF features (default: 200)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--n-components',
|
|
type=int,
|
|
default=50,
|
|
help='Number of SVD components (default: 50)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load data
|
|
print(f"\n📂 Loading data from: {args.input}")
|
|
if args.input.endswith('.xlsx'):
|
|
df = pd.read_excel(args.input)
|
|
elif args.input.endswith('.csv'):
|
|
df = pd.read_csv(args.input)
|
|
else:
|
|
raise ValueError("Input file must be .xlsx or .csv")
|
|
|
|
print(f" Shape: {df.shape}")
|
|
|
|
# Check columns
|
|
missing_cols = [col for col in args.text_columns if col not in df.columns]
|
|
if missing_cols:
|
|
raise ValueError(f"Missing columns in input data: {missing_cols}")
|
|
|
|
# Extract features
|
|
if args.mode == 'train':
|
|
# Training mode: fit and save
|
|
features_df, extractor = extract_features_from_dataframe(
|
|
df,
|
|
text_columns=args.text_columns,
|
|
extractor=TextFeatureExtractor(
|
|
max_features=args.max_features,
|
|
n_components=args.n_components
|
|
),
|
|
fit=True
|
|
)
|
|
|
|
# Save extractor
|
|
extractor.save(args.extractor_path)
|
|
|
|
else: # predict mode
|
|
# Inference mode: load and transform
|
|
if not os.path.exists(args.extractor_path):
|
|
raise FileNotFoundError(f"Extractor not found: {args.extractor_path}")
|
|
|
|
extractor = TextFeatureExtractor.load(args.extractor_path)
|
|
|
|
features_df, _ = extract_features_from_dataframe(
|
|
df,
|
|
text_columns=args.text_columns,
|
|
extractor=extractor,
|
|
fit=False
|
|
)
|
|
|
|
# Save features
|
|
features_df.to_csv(args.output, index=False)
|
|
print(f"\n✅ Saved features to: {args.output}")
|
|
|
|
# Show top TF-IDF features (training mode only)
|
|
if args.mode == 'train':
|
|
print("\n📋 Top 20 TF-IDF features:")
|
|
top_features = extractor.get_top_tfidf_features(top_n=20)
|
|
print(top_features.to_string(index=False))
|
|
|
|
print("\n" + "=" * 80)
|
|
print("✅ COMPLETE!")
|
|
print("=" * 80)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|