use actix_web::{web, HttpResponse, Result}; use actix_session::Session; use serde_json::json; use chrono::Utc; use crate::models::messaging::*; use crate::services::user_persistence::UserPersistence; use crate::utils::response_builder::ResponseBuilder; pub struct MessagingController; impl MessagingController { /// Get all message threads for the current user pub async fn get_threads(session: Session) -> Result { let user_email = match session.get::("user_email") { Ok(Some(email)) => email, _ => return ResponseBuilder::unauthorized().build(), }; match Self::load_user_threads(&user_email).await { Ok(response) => ResponseBuilder::success().data(response).build(), Err(e) => { log::error!("Error loading threads for {}: {}", user_email, e); ResponseBuilder::error().message("Failed to load threads").build() } } } /// Create a new message thread pub async fn create_thread( session: Session, req_data: web::Json, ) -> Result { let user_email = match session.get::("user_email") { Ok(Some(email)) => email, _ => return Ok(HttpResponse::Unauthorized().json(json!({"error": "Not authenticated"}))), }; log::info!("📨 Creating thread request: user={}, recipient={}, context_type={}, context_id={:?}, subject={}", user_email, req_data.recipient_email, req_data.context_type, req_data.context_id, req_data.subject); // Validate request data if req_data.recipient_email.is_empty() { log::warn!("❌ Empty recipient_email in create thread request"); return Ok(HttpResponse::BadRequest().json(json!({"error": "Recipient email is required"}))); } if req_data.context_type.is_empty() { log::warn!("❌ Empty context_type in create thread request"); return Ok(HttpResponse::BadRequest().json(json!({"error": "Context type is required"}))); } if req_data.subject.is_empty() { log::warn!("❌ Empty subject in create thread request"); return Ok(HttpResponse::BadRequest().json(json!({"error": "Subject is required"}))); } // Check if thread already exists if let Ok(existing_thread) = Self::find_existing_thread( &user_email, &req_data.recipient_email, &req_data.context_type, &req_data.context_id, ).await { return Ok(HttpResponse::Ok().json(json!({"thread": existing_thread}))); } // Create new thread let thread = MessageThread::new( user_email.clone(), req_data.recipient_email.clone(), req_data.context_type.clone(), req_data.context_id.clone(), req_data.subject.clone(), ); match Self::save_thread(&thread).await { Ok(_) => Ok(HttpResponse::Ok().json(json!({"thread": thread}))), Err(e) => { log::error!("Error creating thread: {}", e); Ok(HttpResponse::InternalServerError().json(json!({"error": "Failed to create thread"}))) } } } /// Get messages for a specific thread (alias for route compatibility) pub async fn get_messages( session: Session, path: web::Path, ) -> Result { Self::get_thread_messages(session, path).await } /// Get messages for a specific thread pub async fn get_thread_messages( session: Session, path: web::Path, ) -> Result { let user_email = match session.get::("user_email") { Ok(Some(email)) => email, _ => return Ok(HttpResponse::Unauthorized().json(json!({"error": "Not authenticated"}))), }; let thread_id = path.into_inner(); // Verify user has access to this thread if !Self::user_has_thread_access(&user_email, &thread_id).await { return Ok(HttpResponse::Forbidden().json(json!({"error": "Access denied"}))); } match Self::load_thread_messages(&thread_id).await { Ok(messages) => Ok(HttpResponse::Ok().json(MessagesResponse { messages })), Err(e) => { log::error!("Error loading messages for thread {}: {}", thread_id, e); Ok(HttpResponse::InternalServerError().json(json!({"error": "Failed to load messages"}))) } } } /// Send a message in a thread (with thread_id in path) pub async fn send_message_with_path( session: Session, path: web::Path, req_data: web::Json, ) -> Result { let thread_id = path.into_inner(); let mut request = req_data.into_inner(); request.thread_id = thread_id; Self::send_message_impl(session, request).await } /// Send a message in a thread (with thread_id in body) pub async fn send_message( session: Session, req_data: web::Json, ) -> Result { Self::send_message_impl(session, req_data.into_inner()).await } /// Internal implementation for sending messages async fn send_message_impl( session: Session, req_data: SendMessageRequest, ) -> Result { let user_email = match session.get::("user_email") { Ok(Some(email)) => email, _ => return Ok(HttpResponse::Unauthorized().json(json!({"error": "Not authenticated"}))), }; // Verify user has access to this thread if !Self::user_has_thread_access(&user_email, &req_data.thread_id).await { return Ok(HttpResponse::Forbidden().json(json!({"error": "Access denied"}))); } // Get thread to find recipient let thread = match Self::load_thread(&req_data.thread_id).await { Ok(thread) => thread, Err(_) => return Ok(HttpResponse::NotFound().json(json!({"error": "Thread not found"}))), }; let recipient_email = thread.get_recipient_email(&user_email).to_string(); let message_type = req_data.message_type.clone().unwrap_or_else(|| "text".to_string()); let message = Message::new( req_data.thread_id.clone(), user_email, recipient_email.clone(), req_data.content.clone(), message_type, ); match Self::save_message(&message).await { Ok(_) => { // Update thread last message time and unread count Self::update_thread_on_message(&req_data.thread_id, &recipient_email).await.ok(); Ok(HttpResponse::Ok().json(json!({"message": message}))) } Err(e) => { log::error!("Error sending message: {}", e); Ok(HttpResponse::InternalServerError().json(json!({"error": "Failed to send message"}))) } } } /// Mark a thread as read pub async fn mark_thread_read( session: Session, path: web::Path, ) -> Result { let user_email = match session.get::("user_email") { Ok(Some(email)) => email, _ => return Ok(HttpResponse::Unauthorized().json(json!({"error": "Not authenticated"}))), }; let thread_id = path.into_inner(); // Verify user has access to this thread if !Self::user_has_thread_access(&user_email, &thread_id).await { return Ok(HttpResponse::Forbidden().json(json!({"error": "Access denied"}))); } match Self::mark_thread_as_read(&thread_id, &user_email).await { Ok(_) => Ok(HttpResponse::Ok().json(json!({"success": true}))), Err(e) => { log::error!("Error marking thread as read: {}", e); Ok(HttpResponse::InternalServerError().json(json!({"error": "Failed to mark as read"}))) } } } // Helper methods for data persistence using user_data files async fn load_user_threads(user_email: &str) -> Result> { let user_data = UserPersistence::load_user_data(user_email) .ok_or("User data not found")?; let threads: Vec = user_data.message_threads.unwrap_or_default(); log::info!("🔍 Loading threads for user: {}", user_email); log::info!("📊 Found {} threads", threads.len()); let mut thread_summaries = Vec::new(); let mut total_unread = 0; for thread in threads { let unread_count = thread.get_unread_count(user_email); total_unread += unread_count; log::info!("📨 Thread {}: user_a={}, user_b={}, user_a_unread={}, user_b_unread={}, calculated_unread={}", thread.thread_id, thread.user_a_email, thread.user_b_email, thread.user_a_unread_count, thread.user_b_unread_count, unread_count ); // Get last message let messages = Self::load_thread_messages(&thread.thread_id).await.unwrap_or_default(); let last_message = messages.last().map(|m| m.content.clone()); let last_message_at = messages.last().map(|m| m.timestamp); thread_summaries.push(ThreadWithLastMessage { thread_id: thread.thread_id.clone(), recipient_email: thread.get_recipient_email(user_email).to_string(), recipient_name: None, // Could be enhanced to lookup user names subject: thread.subject.clone(), context_type: thread.context_type.clone(), context_id: thread.context_id.clone(), last_message, last_message_at, unread_count, created_at: thread.created_at, }); } // Sort by last message time (most recent first) thread_summaries.sort_by(|a, b| { b.last_message_at.unwrap_or(b.created_at) .cmp(&a.last_message_at.unwrap_or(a.created_at)) }); log::info!("📊 Total unread count for {}: {}", user_email, total_unread); Ok(ThreadsResponse { threads: thread_summaries, unread_count: total_unread, }) } async fn find_existing_thread( user_email: &str, recipient_email: &str, context_type: &str, context_id: &Option, ) -> Result> { let user_data = UserPersistence::load_user_data(user_email) .ok_or("User data not found")?; let threads = user_data.message_threads.unwrap_or_default(); for thread in threads { if (thread.user_a_email == user_email && thread.user_b_email == recipient_email) || (thread.user_a_email == recipient_email && thread.user_b_email == user_email) { if thread.context_type == context_type && thread.context_id == *context_id { return Ok(thread); } } } Err("Thread not found".into()) } async fn save_thread(thread: &MessageThread) -> Result<(), Box> { // Save to both users' data Self::add_thread_to_user(&thread.user_a_email, thread).await?; Self::add_thread_to_user(&thread.user_b_email, thread).await?; Ok(()) } async fn add_thread_to_user(user_email: &str, thread: &MessageThread) -> Result<(), Box> { let mut user_data = UserPersistence::load_user_data(user_email) .unwrap_or_else(|| { use crate::services::user_persistence::UserPersistentData; UserPersistentData { user_email: user_email.to_string(), message_threads: Some(Vec::new()), messages: Some(Vec::new()), ..Default::default() } }); if user_data.message_threads.is_none() { user_data.message_threads = Some(Vec::new()); } if let Some(ref mut threads) = user_data.message_threads { // Check if thread already exists if !threads.iter().any(|t| t.thread_id == thread.thread_id) { threads.push(thread.clone()); } } UserPersistence::save_user_data(&user_data)?; Ok(()) } async fn load_thread(thread_id: &str) -> Result> { // For simplicity, we'll search through all user files to find the thread // In a real implementation, you'd have a more efficient lookup let user_files = std::fs::read_dir("user_data/")?; for entry in user_files { let entry = entry?; if let Some(filename) = entry.file_name().to_str() { if filename.ends_with(".json") && !filename.contains("_cart") { let email = filename.replace(".json", "").replace("_at_", "@"); if let Some(user_data) = UserPersistence::load_user_data(&email) { if let Some(threads) = user_data.message_threads { for thread in threads { if thread.thread_id == thread_id { return Ok(thread); } } } } } } } Err("Thread not found".into()) } async fn user_has_thread_access(user_email: &str, thread_id: &str) -> bool { if let Ok(thread) = Self::load_thread(thread_id).await { return thread.user_a_email == user_email || thread.user_b_email == user_email; } false } async fn load_thread_messages(thread_id: &str) -> Result, Box> { // Load messages from user data files instead of separate message files let user_files = std::fs::read_dir("user_data/")?; let mut all_messages = Vec::new(); for entry in user_files { let entry = entry?; if let Some(filename) = entry.file_name().to_str() { if filename.ends_with(".json") && !filename.contains("_cart") && !filename.starts_with("messages_") { let email = filename.replace(".json", "").replace("_at_", "@"); if let Some(user_data) = UserPersistence::load_user_data(&email) { if let Some(messages) = &user_data.messages { for message in messages { if message.thread_id == thread_id { all_messages.push(message.clone()); } } } } } } } // Sort messages by timestamp and deduplicate all_messages.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); all_messages.dedup_by(|a, b| a.message_id == b.message_id); Ok(all_messages) } async fn save_message(message: &Message) -> Result<(), Box> { // Save message to both sender and recipient user data files let participants = vec![&message.sender_email, &message.recipient_email]; for email in participants { let mut user_data = UserPersistence::load_user_data(email) .unwrap_or_else(|| { use crate::services::user_persistence::UserPersistentData; UserPersistentData { user_email: email.clone(), message_threads: Some(Vec::new()), messages: Some(Vec::new()), ..Default::default() } }); // Add message to user's messages if let Some(ref mut messages) = user_data.messages { messages.push(message.clone()); } else { user_data.messages = Some(vec![message.clone()]); } UserPersistence::save_user_data(&user_data)?; } Ok(()) } async fn update_thread_on_message(thread_id: &str, recipient_email: &str) -> Result<(), Box> { log::info!("📨 Updating thread {} for recipient {}", thread_id, recipient_email); // Update recipient's thread data to increment unread count let mut recipient_data = UserPersistence::load_user_data(recipient_email) .ok_or("Recipient user data not found")?; if let Some(ref mut threads) = recipient_data.message_threads { for thread in threads.iter_mut() { if thread.thread_id == thread_id { log::info!("📨 Found thread {} in recipient's data", thread_id); log::info!("📨 Before increment - Thread {}: user_a={}, user_b={}, user_a_unread={}, user_b_unread={}", thread.thread_id, thread.user_a_email, thread.user_b_email, thread.user_a_unread_count, thread.user_b_unread_count); thread.last_message_at = Some(Utc::now()); thread.updated_at = Utc::now(); thread.increment_unread_count(recipient_email); log::info!("📨 After increment - Thread {}: user_a_unread={}, user_b_unread={}", thread.thread_id, thread.user_a_unread_count, thread.user_b_unread_count); UserPersistence::save_user_data(&recipient_data)?; log::info!("📨 Saved recipient data for {}", recipient_email); break; } } } Ok(()) } async fn mark_thread_as_read(thread_id: &str, user_email: &str) -> Result<(), Box> { let mut user_data = UserPersistence::load_user_data(user_email) .ok_or("User data not found")?; if let Some(ref mut threads) = user_data.message_threads { for thread in threads.iter_mut() { if thread.thread_id == thread_id { thread.reset_unread_count(user_email); UserPersistence::save_user_data(&user_data)?; break; } } } Ok(()) } }