NhansuperCa/predict_staff.py

347 lines
12 KiB
Python

"""
Hệ thống dự đoán số lượng nhân sự theo ca làm việc và đặc điểm tòa nhà
Author: ML Expert
Date: December 29, 2025
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import re
import warnings
warnings.filterwarnings('ignore')
class StaffPredictor:
"""Class dự đoán số lượng nhân sự"""
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.building_data = None
self.shift_data = None
self.feature_columns = None
def load_data(self, building_file='namoi.csv', shift_file='calamviec.xlsx'):
"""Đọc dữ liệu từ file"""
print("📂 Đang đọc dữ liệu...")
# Đọc dữ liệu tòa nhà
self.building_data = pd.read_csv(building_file, encoding='utf-8')
print(f"✅ Đã đọc {len(self.building_data)} tòa nhà")
# Đọc dữ liệu ca làm việc
self.shift_data = pd.read_excel(shift_file)
print(f"✅ Đã đọc {len(self.shift_data)} records ca làm việc")
return self.building_data, self.shift_data
def extract_shift_features(self, shift_str):
"""Trích xuất thông tin từ ca làm việc"""
# Chuẩn hóa format giờ
shift_str = str(shift_str).strip()
# Các pattern có thể có
patterns = [
r'(\d+)[h:](\d+)[-:](\d+)[h:](\d+)', # 14h00:22h00 hoặc 14:00-22:00
r'(\d+)[h:](\d+)\s*-\s*(\d+)[h:](\d+)', # 6h30-14h30
r'(\d+):(\d+)\s*-\s*(\d+):(\d+)', # 6:30-14:30
]
for pattern in patterns:
match = re.search(pattern, shift_str)
if match:
start_hour = int(match.group(1))
start_min = int(match.group(2))
end_hour = int(match.group(3))
end_min = int(match.group(4))
# Tính độ dài ca (giờ)
start_time = start_hour + start_min/60
end_time = end_hour + end_min/60
# Xử lý trường hợp qua đêm
if end_time < start_time:
end_time += 24
duration = end_time - start_time
return {
'start_hour': start_hour,
'start_minute': start_min,
'end_hour': end_hour,
'end_minute': end_min,
'shift_duration': duration,
'is_morning': 1 if 6 <= start_hour < 12 else 0,
'is_afternoon': 1 if 12 <= start_hour < 18 else 0,
'is_evening': 1 if 18 <= start_hour < 24 else 0,
}
# Nếu không match được, return giá trị mặc định
return {
'start_hour': 8,
'start_minute': 0,
'end_hour': 17,
'end_minute': 0,
'shift_duration': 8,
'is_morning': 1,
'is_afternoon': 0,
'is_evening': 0,
}
def prepare_features(self):
"""Chuẩn bị features để train model"""
print("\n🔧 Chuẩn bị features...")
# Merge dữ liệu
df = pd.merge(self.shift_data, self.building_data,
left_on='Mã địa điểm', right_on='Mã địa điểm', how='left')
# Trích xuất features từ ca làm việc
shift_features = df['Ca'].apply(self.extract_shift_features)
shift_features_df = pd.DataFrame(shift_features.tolist())
# Tính tổng số ca của mỗi tòa nhà
shifts_per_building = self.shift_data.groupby('Mã địa điểm')['Ca'].nunique().to_dict()
df['total_shifts_per_building'] = df['Mã địa điểm'].map(shifts_per_building)
# Kết hợp tất cả features
df = pd.concat([df, shift_features_df], axis=1)
# Chọn features quan trọng
feature_cols = [
# Features từ tòa nhà
'Tổng Giờ hoạt động của khách hàng mỗi tuần',
'Lưu lượng KH hoạt động ngày tại Tòa tháp',
'Diện tích ngoại cảnh Tòa tháp (m2)',
'Số tòa tháp',
'Số tầng nổi',
'Số tầng hầm',
'Tầng hầm (m2)',
'Sàn Sảnh (m2)',
'Sàn Hành lang (m2)',
'Sàn WC (m2)',
'Sàn Phòng (m2) ',
'Thảm (m2)',
'Kính (m2)',
'Thang máy',
'Thang bộ',
'total_shifts_per_building',
# Features từ ca làm việc
'start_hour',
'shift_duration',
'is_morning',
'is_afternoon',
'is_evening',
]
X = df[feature_cols].fillna(0)
y = df['Number']
self.feature_columns = feature_cols
print(f"✅ Đã chuẩn bị {len(feature_cols)} features")
print(f"📊 Dataset shape: {X.shape}")
return X, y, df
def train_model(self, X, y, model_type='random_forest'):
"""Train model"""
print(f"\n🤖 Training model: {model_type}...")
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Standardize features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Chọn model
if model_type == 'random_forest':
self.model = RandomForestRegressor(
n_estimators=100,
max_depth=10,
min_samples_split=2,
min_samples_leaf=1,
random_state=42
)
elif model_type == 'gradient_boosting':
self.model = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
)
else: # ridge
self.model = Ridge(alpha=1.0)
# Train
self.model.fit(X_train_scaled, y_train)
# Evaluate
y_pred_train = self.model.predict(X_train_scaled)
y_pred_test = self.model.predict(X_test_scaled)
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
train_mae = mean_absolute_error(y_train, y_pred_train)
test_mae = mean_absolute_error(y_test, y_pred_test)
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
print(f"\n📈 Kết quả đánh giá:")
print(f"Train RMSE: {train_rmse:.2f}, MAE: {train_mae:.2f}, R²: {train_r2:.3f}")
print(f"Test RMSE: {test_rmse:.2f}, MAE: {test_mae:.2f}, R²: {test_r2:.3f}")
# Cross-validation (với dataset nhỏ, dùng 3-fold)
cv_scores = cross_val_score(self.model, X_train_scaled, y_train,
cv=min(3, len(X_train)),
scoring='neg_mean_squared_error')
cv_rmse = np.sqrt(-cv_scores.mean())
print(f"Cross-validation RMSE: {cv_rmse:.2f} (+/- {np.sqrt(-cv_scores).std():.2f})")
return {
'train_rmse': train_rmse,
'test_rmse': test_rmse,
'train_mae': train_mae,
'test_mae': test_mae,
'train_r2': train_r2,
'test_r2': test_r2,
'cv_rmse': cv_rmse
}
def predict_staff(self, building_code, shift_time):
"""
Dự đoán số lượng nhân sự cho một tòa nhà và ca làm việc cụ thể
Parameters:
-----------
building_code : str
Mã địa điểm (vd: '559-1')
shift_time : str
Ca làm việc (vd: '6h00:14h00')
Returns:
--------
int : Số lượng nhân sự dự đoán
"""
# Lấy thông tin tòa nhà
building_info = self.building_data[
self.building_data['Mã địa điểm'] == building_code
]
if len(building_info) == 0:
raise ValueError(f"Không tìm thấy tòa nhà với mã: {building_code}")
# Trích xuất features từ ca làm việc
shift_features = self.extract_shift_features(shift_time)
# Tính tổng số ca của tòa nhà này
total_shifts = self.shift_data[self.shift_data['Mã địa điểm'] == building_code]['Ca'].nunique()
# Tạo input features
input_data = {}
# Features từ tòa nhà
for col in self.feature_columns:
if col == 'total_shifts_per_building':
input_data[col] = total_shifts
elif col in building_info.columns:
input_data[col] = building_info[col].values[0]
elif col in shift_features:
input_data[col] = shift_features[col]
else:
input_data[col] = 0
# Convert to DataFrame
X_input = pd.DataFrame([input_data])
# Standardize
X_input_scaled = self.scaler.transform(X_input)
# Predict
prediction = self.model.predict(X_input_scaled)[0]
# Làm tròn và đảm bảo >= 1
prediction = max(1, round(prediction))
return prediction
def show_feature_importance(self):
"""Hiển thị độ quan trọng của features"""
if hasattr(self.model, 'feature_importances_'):
print("\n📊 Feature Importance (Top 10):")
importances = pd.DataFrame({
'feature': self.feature_columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
for idx, row in importances.head(10).iterrows():
print(f" {row['feature']}: {row['importance']:.4f}")
else:
print("\n⚠️ Model không hỗ trợ feature importance")
def main():
"""Hàm chính để chạy chương trình"""
print("="*60)
print("🏢 HỆ THỐNG DỰ ĐOÁN SỐ LƯỢNG NHÂN SỰ THEO CA LÀM VIỆC")
print("="*60)
# Khởi tạo predictor
predictor = StaffPredictor()
# Load dữ liệu
building_df, shift_df = predictor.load_data()
print("\n📋 Thông tin dữ liệu:")
print(f" - Số tòa nhà: {len(building_df)}")
print(f" - Số records ca làm việc: {len(shift_df)}")
print(f" - Các tòa nhà: {', '.join(building_df['Mã địa điểm'].unique())}")
# Chuẩn bị features
X, y, df_full = predictor.prepare_features()
# Train model
results = predictor.train_model(X, y, model_type='random_forest')
# Hiển thị feature importance
predictor.show_feature_importance()
# Demo dự đoán
print("\n" + "="*60)
print("🎯 DEMO DỰ ĐOÁN")
print("="*60)
# Dự đoán cho một số trường hợp
test_cases = [
('559-1', '6h00:14h00'),
('618-1', '14h00:22h00'),
('283-1', '6h00:14h00'),
]
for building_code, shift_time in test_cases:
predicted_staff = predictor.predict_staff(building_code, shift_time)
print(f"\n🏢 Tòa nhà: {building_code}")
print(f"⏰ Ca làm việc: {shift_time}")
print(f"👥 Số nhân sự dự đoán: {predicted_staff} người")
print("\n" + "="*60)
print("✅ Hoàn thành!")
print("="*60)
return predictor
if __name__ == "__main__":
predictor = main()
# Bạn có thể dùng predictor để dự đoán:
# predictor.predict_staff('559-1', '14h00:22h00')