"""Chat API routes for session management and chat history.""" import logging from datetime import datetime from typing import Dict, Any, Optional from flask import Blueprint, request, jsonify, current_app, g from flask_limiter import Limiter from flask_limiter.util import get_remote_address import redis from ..services.session_manager import SessionManager, SessionManagerError, SessionNotFoundError, SessionExpiredError from ..services.chat_history import ChatHistoryManager, ChatHistoryError from ..services.language_context import LanguageContextManager, LanguageContextError from ..models.language_context import LanguageContext from ..models.base import db from .middleware import require_auth, validate_json_request, create_limiter logger = logging.getLogger(__name__) # Create blueprint chat_bp = Blueprint('chat', __name__, url_prefix='/api/v1/chat') # Initialize rate limiter (will be configured in app factory) limiter = create_limiter() class APIError(Exception): """Base API error class.""" def __init__(self, message: str, status_code: int = 400, payload: Optional[Dict] = None): super().__init__() self.message = message self.status_code = status_code self.payload = payload def handle_api_error(error: APIError): """Handle API errors and return JSON response.""" response = {'error': error.message} if error.payload: response.update(error.payload) return jsonify(response), error.status_code def get_services(): """Get service instances from current app context.""" redis_client = None redis_url = current_app.config.get('REDIS_URL') if redis_url and redis_url != 'None': try: redis_client = redis.from_url(redis_url) # Test the connection redis_client.ping() except Exception as e: logger.warning(f"Redis connection failed: {e}. Running without Redis cache.") redis_client = None else: logger.info("Redis disabled in configuration. Running without Redis cache.") session_manager = SessionManager(redis_client, current_app.config.get('SESSION_TIMEOUT', 3600)) chat_history_manager = ChatHistoryManager( redis_client, current_app.config.get('MAX_CHAT_HISTORY', 20), current_app.config.get('CONTEXT_WINDOW_SIZE', 10) ) language_context_manager = LanguageContextManager() return session_manager, chat_history_manager, language_context_manager # Session Management Endpoints @chat_bp.route('/sessions', methods=['POST']) @limiter.limit("10 per minute") @require_auth @validate_json_request(['language']) def create_session(): """ Create a new chat session. Request body: { "language": "python", "metadata": {"key": "value"} // optional } """ try: data = request.json_data language = data['language'] metadata = data.get('metadata', {}) # Validate language if not LanguageContext.is_supported_language(language): supported = LanguageContext.get_supported_languages() raise APIError(f'Unsupported language: {language}. Supported: {", ".join(supported)}', 400) session_manager, _, language_context_manager = get_services() # Create session session = session_manager.create_session( user_id=request.user_id, language=language, session_metadata=metadata ) # Create language context language_context_manager.create_context(session.id, language) logger.info(f"Created session {session.id} for user {request.user_id}") return jsonify({ 'session_id': session.id, 'user_id': session.user_id, 'language': session.language, 'created_at': session.created_at.isoformat(), 'message_count': session.message_count, 'metadata': session.session_metadata }), 201 except (SessionManagerError, LanguageContextError) as e: logger.error(f"Error creating session: {e}") raise APIError(f'Failed to create session: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error creating session: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/sessions/', methods=['GET']) @limiter.limit("30 per minute") @require_auth def get_session(session_id: str): """Get session information.""" try: session_manager, _, _ = get_services() session = session_manager.get_session(session_id) # Check if user owns this session if session.user_id != request.user_id: raise APIError('Access denied', 403) return jsonify({ 'session_id': session.id, 'user_id': session.user_id, 'language': session.language, 'created_at': session.created_at.isoformat(), 'last_active': session.last_active.isoformat(), 'message_count': session.message_count, 'is_active': session.is_active, 'metadata': session.session_metadata }) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except SessionManagerError as e: logger.error(f"Error getting session: {e}") raise APIError(f'Failed to get session: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error getting session: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/sessions/', methods=['DELETE']) @limiter.limit("5 per minute") @require_auth def delete_session(session_id: str): """Delete a chat session.""" try: session_manager, chat_history_manager, _ = get_services() # Get session to check ownership session = session_manager.get_session(session_id) # Check if user owns this session if session.user_id != request.user_id: raise APIError('Access denied', 403) # Clear chat history message_count = chat_history_manager.clear_session_history(session_id) # Delete session session_manager.delete_session(session_id) logger.info(f"Deleted session {session_id} with {message_count} messages") return jsonify({ 'message': 'Session deleted successfully', 'session_id': session_id, 'messages_deleted': message_count }) except SessionNotFoundError: raise APIError('Session not found', 404) except (SessionManagerError, ChatHistoryError) as e: logger.error(f"Error deleting session: {e}") raise APIError(f'Failed to delete session: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error deleting session: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/sessions', methods=['GET']) @limiter.limit("20 per minute") @require_auth def list_user_sessions(): """List all sessions for the authenticated user.""" try: active_only = request.args.get('active_only', 'true').lower() == 'true' session_manager, _, _ = get_services() sessions = session_manager.get_user_sessions(request.user_id, active_only) session_list = [] for session in sessions: session_list.append({ 'session_id': session.id, 'language': session.language, 'created_at': session.created_at.isoformat(), 'last_active': session.last_active.isoformat(), 'message_count': session.message_count, 'is_active': session.is_active, 'metadata': session.session_metadata }) return jsonify({ 'sessions': session_list, 'total_count': len(session_list), 'active_only': active_only }) except SessionManagerError as e: logger.error(f"Error listing sessions: {e}") raise APIError(f'Failed to list sessions: {str(e)}', 500) except Exception as e: logger.error(f"Unexpected error listing sessions: {e}") raise APIError('Internal server error', 500) # Chat History Endpoints @chat_bp.route('/sessions//history', methods=['GET']) @limiter.limit("30 per minute") @require_auth def get_chat_history(session_id: str): """Get chat history for a session.""" try: # Validate session ownership session_manager, chat_history_manager, _ = get_services() session = session_manager.get_session(session_id) if session.user_id != request.user_id: raise APIError('Access denied', 403) # Get pagination parameters page = int(request.args.get('page', 1)) page_size = min(int(request.args.get('page_size', 50)), 100) # Max 100 messages per page recent_only = request.args.get('recent_only', 'false').lower() == 'true' if recent_only: # Get recent messages for context limit = min(int(request.args.get('limit', 10)), 50) # Max 50 recent messages messages = chat_history_manager.get_recent_history(session_id, limit) total_count = len(messages) else: # Get paginated full history messages = chat_history_manager.get_full_history(session_id, page, page_size) total_count = chat_history_manager.get_message_count(session_id) message_list = [] for message in messages: message_list.append({ 'id': message.id, 'role': message.role, 'content': message.content, 'language': message.language, 'timestamp': message.timestamp.isoformat(), 'metadata': message.message_metadata }) response_data = { 'messages': message_list, 'session_id': session_id, 'total_count': total_count } if not recent_only: response_data.update({ 'page': page, 'page_size': page_size, 'total_pages': (total_count + page_size - 1) // page_size }) return jsonify(response_data) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except ChatHistoryError as e: logger.error(f"Error getting chat history: {e}") raise APIError(f'Failed to get chat history: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error getting chat history: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/sessions//history/search', methods=['GET']) @limiter.limit("20 per minute") @require_auth def search_chat_history(session_id: str): """Search chat history for a session.""" try: query = request.args.get('q', '').strip() if not query: raise APIError('Search query is required', 400) if len(query) < 3: raise APIError('Search query must be at least 3 characters', 400) # Validate session ownership session_manager, chat_history_manager, _ = get_services() session = session_manager.get_session(session_id) if session.user_id != request.user_id: raise APIError('Access denied', 403) limit = min(int(request.args.get('limit', 20)), 50) # Max 50 results messages = chat_history_manager.search_messages(session_id, query, limit) message_list = [] for message in messages: message_list.append({ 'id': message.id, 'role': message.role, 'content': message.content, 'language': message.language, 'timestamp': message.timestamp.isoformat(), 'metadata': message.message_metadata }) return jsonify({ 'messages': message_list, 'session_id': session_id, 'query': query, 'result_count': len(message_list) }) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except ChatHistoryError as e: logger.error(f"Error searching chat history: {e}") raise APIError(f'Failed to search chat history: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error searching chat history: {e}") raise APIError('Internal server error', 500) # Language Context Endpoints @chat_bp.route('/sessions//language', methods=['GET']) @limiter.limit("30 per minute") @require_auth def get_language_context(session_id: str): """Get language context for a session.""" try: # Validate session ownership session_manager, _, language_context_manager = get_services() session = session_manager.get_session(session_id) if session.user_id != request.user_id: raise APIError('Access denied', 403) context = language_context_manager.get_context(session_id) return jsonify({ 'session_id': session_id, 'language': context.language, 'prompt_template': context.get_prompt_template(), 'syntax_highlighting': context.get_syntax_highlighting(), 'language_info': context.get_language_info(), 'updated_at': context.updated_at.isoformat() }) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except LanguageContextError as e: logger.error(f"Error getting language context: {e}") raise APIError(f'Failed to get language context: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error getting language context: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/sessions//language', methods=['PUT']) @limiter.limit("10 per minute") @require_auth @validate_json_request(['language']) def update_language_context(session_id: str): """Update language context for a session.""" try: data = request.json_data language = data['language'] # Validate language if not LanguageContext.is_supported_language(language): supported = LanguageContext.get_supported_languages() raise APIError(f'Unsupported language: {language}. Supported: {", ".join(supported)}', 400) # Validate session ownership session_manager, _, language_context_manager = get_services() session = session_manager.get_session(session_id) if session.user_id != request.user_id: raise APIError('Access denied', 403) # Update language context context = language_context_manager.set_language(session_id, language) # Update session language session_manager.set_session_language(session_id, language) logger.info(f"Updated language to {language} for session {session_id}") return jsonify({ 'session_id': session_id, 'language': context.language, 'prompt_template': context.get_prompt_template(), 'syntax_highlighting': context.get_syntax_highlighting(), 'language_info': context.get_language_info(), 'updated_at': context.updated_at.isoformat() }) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except (SessionManagerError, LanguageContextError) as e: logger.error(f"Error updating language context: {e}") raise APIError(f'Failed to update language context: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error updating language context: {e}") raise APIError('Internal server error', 500) @chat_bp.route('/languages', methods=['GET']) @limiter.limit("50 per minute") def get_supported_languages(): """Get list of supported programming languages.""" try: languages = LanguageContext.get_supported_languages() language_names = LanguageContext.get_language_display_names() language_list = [] for lang_code in languages: lang_info = LanguageContext.SUPPORTED_LANGUAGES[lang_code] language_list.append({ 'code': lang_code, 'name': lang_info['name'], 'syntax_highlighting': lang_info['syntax_highlighting'], 'file_extensions': lang_info['file_extensions'] }) return jsonify({ 'languages': language_list, 'default_language': 'python', 'total_count': len(language_list) }) except Exception as e: logger.error(f"Unexpected error getting supported languages: {e}") raise APIError('Internal server error', 500) # Message Processing Endpoint @chat_bp.route('/sessions//message', methods=['POST']) @limiter.limit("30 per minute") @require_auth @validate_json_request(['content']) def send_message(session_id: str): """Send a message to the chat agent and get a response.""" try: data = request.json_data content = data['content'].strip() language = data.get('language') # Optional language override if not content: raise APIError('Message content cannot be empty', 400) if len(content) > 5000: # Reasonable message length limit raise APIError('Message too long (max 5000 characters)', 400) # Get services session_manager, chat_history_manager, language_context_manager = get_services() # Validate session ownership session = session_manager.get_session(session_id) if session.user_id != request.user_id: raise APIError('Access denied', 403) # Initialize chat agent from ..services.groq_client import GroqClient from ..services.chat_agent import ChatAgent from ..services.programming_assistance import ProgrammingAssistanceService groq_client = GroqClient() programming_assistance_service = ProgrammingAssistanceService() chat_agent = ChatAgent( groq_client=groq_client, language_context_manager=language_context_manager, session_manager=session_manager, chat_history_manager=chat_history_manager, programming_assistance_service=programming_assistance_service ) # Process the message result = chat_agent.process_message(session_id, content, language) logger.info(f"Processed message for session {session_id}, response length: {len(result['response'])}") return jsonify({ 'response': result['response'], 'message_id': result['message_id'], 'session_id': session_id, 'language': result['language'], 'processing_time': result['processing_time'], 'timestamp': result['timestamp'] }) except SessionNotFoundError: raise APIError('Session not found', 404) except SessionExpiredError: raise APIError('Session has expired', 410) except ChatAgentError as e: logger.error(f"Chat agent error: {e}") raise APIError(f'Failed to process message: {str(e)}', 500) except APIError: raise except Exception as e: logger.error(f"Unexpected error processing message: {e}") raise APIError('Internal server error', 500) # Health Check Endpoint @chat_bp.route('/health', methods=['GET']) def health_check(): """Health check endpoint for monitoring.""" try: # Check database connection from sqlalchemy import text db.session.execute(text('SELECT 1')) # Check Redis connection (if configured) redis_status = "disabled" redis_url = current_app.config.get('REDIS_URL') if redis_url and redis_url != 'None': try: redis_client = redis.from_url(redis_url) redis_client.ping() redis_status = "connected" except Exception: redis_status = "disconnected" else: redis_status = "disabled" return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'services': { 'database': 'connected', 'redis': redis_status } }) except Exception as e: logger.error(f"Health check failed: {e}") return jsonify({ 'status': 'unhealthy', 'timestamp': datetime.utcnow().isoformat(), 'error': str(e) }), 503 # Error handlers @chat_bp.errorhandler(APIError) def handle_api_error_handler(error): """Handle APIError exceptions.""" return handle_api_error(error) @chat_bp.errorhandler(400) def handle_bad_request(error): """Handle bad request errors.""" return jsonify({'error': 'Bad request'}), 400 @chat_bp.errorhandler(404) def handle_not_found(error): """Handle not found errors.""" return jsonify({'error': 'Not found'}), 404 @chat_bp.errorhandler(429) def handle_rate_limit_exceeded(error): """Handle rate limit exceeded errors.""" return jsonify({ 'error': 'Rate limit exceeded', 'message': 'Too many requests. Please try again later.' }), 429 @chat_bp.errorhandler(500) def handle_internal_error(error): """Handle internal server errors.""" logger.error(f"Internal server error: {error}") return jsonify({'error': 'Internal server error'}), 500