scratch_chat / chat_agent /api /chat_routes.py
WebashalarForML's picture
Upload 178 files
330b6e4 verified
"""Chat API routes for session management and chat history."""
import logging
from datetime import datetime
from typing import Dict, Any, Optional
from flask import Blueprint, request, jsonify, current_app, g
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
from ..services.session_manager import SessionManager, SessionManagerError, SessionNotFoundError, SessionExpiredError
from ..services.chat_history import ChatHistoryManager, ChatHistoryError
from ..services.language_context import LanguageContextManager, LanguageContextError
from ..models.language_context import LanguageContext
from ..models.base import db
from .middleware import require_auth, validate_json_request, create_limiter
logger = logging.getLogger(__name__)
# Create blueprint
chat_bp = Blueprint('chat', __name__, url_prefix='/api/v1/chat')
# Initialize rate limiter (will be configured in app factory)
limiter = create_limiter()
class APIError(Exception):
"""Base API error class."""
def __init__(self, message: str, status_code: int = 400, payload: Optional[Dict] = None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload
def handle_api_error(error: APIError):
"""Handle API errors and return JSON response."""
response = {'error': error.message}
if error.payload:
response.update(error.payload)
return jsonify(response), error.status_code
def get_services():
"""Get service instances from current app context."""
redis_client = None
redis_url = current_app.config.get('REDIS_URL')
if redis_url and redis_url != 'None':
try:
redis_client = redis.from_url(redis_url)
# Test the connection
redis_client.ping()
except Exception as e:
logger.warning(f"Redis connection failed: {e}. Running without Redis cache.")
redis_client = None
else:
logger.info("Redis disabled in configuration. Running without Redis cache.")
session_manager = SessionManager(redis_client, current_app.config.get('SESSION_TIMEOUT', 3600))
chat_history_manager = ChatHistoryManager(
redis_client,
current_app.config.get('MAX_CHAT_HISTORY', 20),
current_app.config.get('CONTEXT_WINDOW_SIZE', 10)
)
language_context_manager = LanguageContextManager()
return session_manager, chat_history_manager, language_context_manager
# Session Management Endpoints
@chat_bp.route('/sessions', methods=['POST'])
@limiter.limit("10 per minute")
@require_auth
@validate_json_request(['language'])
def create_session():
"""
Create a new chat session.
Request body:
{
"language": "python",
"metadata": {"key": "value"} // optional
}
"""
try:
data = request.json_data
language = data['language']
metadata = data.get('metadata', {})
# Validate language
if not LanguageContext.is_supported_language(language):
supported = LanguageContext.get_supported_languages()
raise APIError(f'Unsupported language: {language}. Supported: {", ".join(supported)}', 400)
session_manager, _, language_context_manager = get_services()
# Create session
session = session_manager.create_session(
user_id=request.user_id,
language=language,
session_metadata=metadata
)
# Create language context
language_context_manager.create_context(session.id, language)
logger.info(f"Created session {session.id} for user {request.user_id}")
return jsonify({
'session_id': session.id,
'user_id': session.user_id,
'language': session.language,
'created_at': session.created_at.isoformat(),
'message_count': session.message_count,
'metadata': session.session_metadata
}), 201
except (SessionManagerError, LanguageContextError) as e:
logger.error(f"Error creating session: {e}")
raise APIError(f'Failed to create session: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error creating session: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/sessions/<session_id>', methods=['GET'])
@limiter.limit("30 per minute")
@require_auth
def get_session(session_id: str):
"""Get session information."""
try:
session_manager, _, _ = get_services()
session = session_manager.get_session(session_id)
# Check if user owns this session
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
return jsonify({
'session_id': session.id,
'user_id': session.user_id,
'language': session.language,
'created_at': session.created_at.isoformat(),
'last_active': session.last_active.isoformat(),
'message_count': session.message_count,
'is_active': session.is_active,
'metadata': session.session_metadata
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except SessionManagerError as e:
logger.error(f"Error getting session: {e}")
raise APIError(f'Failed to get session: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error getting session: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/sessions/<session_id>', methods=['DELETE'])
@limiter.limit("5 per minute")
@require_auth
def delete_session(session_id: str):
"""Delete a chat session."""
try:
session_manager, chat_history_manager, _ = get_services()
# Get session to check ownership
session = session_manager.get_session(session_id)
# Check if user owns this session
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
# Clear chat history
message_count = chat_history_manager.clear_session_history(session_id)
# Delete session
session_manager.delete_session(session_id)
logger.info(f"Deleted session {session_id} with {message_count} messages")
return jsonify({
'message': 'Session deleted successfully',
'session_id': session_id,
'messages_deleted': message_count
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except (SessionManagerError, ChatHistoryError) as e:
logger.error(f"Error deleting session: {e}")
raise APIError(f'Failed to delete session: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error deleting session: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/sessions', methods=['GET'])
@limiter.limit("20 per minute")
@require_auth
def list_user_sessions():
"""List all sessions for the authenticated user."""
try:
active_only = request.args.get('active_only', 'true').lower() == 'true'
session_manager, _, _ = get_services()
sessions = session_manager.get_user_sessions(request.user_id, active_only)
session_list = []
for session in sessions:
session_list.append({
'session_id': session.id,
'language': session.language,
'created_at': session.created_at.isoformat(),
'last_active': session.last_active.isoformat(),
'message_count': session.message_count,
'is_active': session.is_active,
'metadata': session.session_metadata
})
return jsonify({
'sessions': session_list,
'total_count': len(session_list),
'active_only': active_only
})
except SessionManagerError as e:
logger.error(f"Error listing sessions: {e}")
raise APIError(f'Failed to list sessions: {str(e)}', 500)
except Exception as e:
logger.error(f"Unexpected error listing sessions: {e}")
raise APIError('Internal server error', 500)
# Chat History Endpoints
@chat_bp.route('/sessions/<session_id>/history', methods=['GET'])
@limiter.limit("30 per minute")
@require_auth
def get_chat_history(session_id: str):
"""Get chat history for a session."""
try:
# Validate session ownership
session_manager, chat_history_manager, _ = get_services()
session = session_manager.get_session(session_id)
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
# Get pagination parameters
page = int(request.args.get('page', 1))
page_size = min(int(request.args.get('page_size', 50)), 100) # Max 100 messages per page
recent_only = request.args.get('recent_only', 'false').lower() == 'true'
if recent_only:
# Get recent messages for context
limit = min(int(request.args.get('limit', 10)), 50) # Max 50 recent messages
messages = chat_history_manager.get_recent_history(session_id, limit)
total_count = len(messages)
else:
# Get paginated full history
messages = chat_history_manager.get_full_history(session_id, page, page_size)
total_count = chat_history_manager.get_message_count(session_id)
message_list = []
for message in messages:
message_list.append({
'id': message.id,
'role': message.role,
'content': message.content,
'language': message.language,
'timestamp': message.timestamp.isoformat(),
'metadata': message.message_metadata
})
response_data = {
'messages': message_list,
'session_id': session_id,
'total_count': total_count
}
if not recent_only:
response_data.update({
'page': page,
'page_size': page_size,
'total_pages': (total_count + page_size - 1) // page_size
})
return jsonify(response_data)
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except ChatHistoryError as e:
logger.error(f"Error getting chat history: {e}")
raise APIError(f'Failed to get chat history: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error getting chat history: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/sessions/<session_id>/history/search', methods=['GET'])
@limiter.limit("20 per minute")
@require_auth
def search_chat_history(session_id: str):
"""Search chat history for a session."""
try:
query = request.args.get('q', '').strip()
if not query:
raise APIError('Search query is required', 400)
if len(query) < 3:
raise APIError('Search query must be at least 3 characters', 400)
# Validate session ownership
session_manager, chat_history_manager, _ = get_services()
session = session_manager.get_session(session_id)
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
limit = min(int(request.args.get('limit', 20)), 50) # Max 50 results
messages = chat_history_manager.search_messages(session_id, query, limit)
message_list = []
for message in messages:
message_list.append({
'id': message.id,
'role': message.role,
'content': message.content,
'language': message.language,
'timestamp': message.timestamp.isoformat(),
'metadata': message.message_metadata
})
return jsonify({
'messages': message_list,
'session_id': session_id,
'query': query,
'result_count': len(message_list)
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except ChatHistoryError as e:
logger.error(f"Error searching chat history: {e}")
raise APIError(f'Failed to search chat history: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error searching chat history: {e}")
raise APIError('Internal server error', 500)
# Language Context Endpoints
@chat_bp.route('/sessions/<session_id>/language', methods=['GET'])
@limiter.limit("30 per minute")
@require_auth
def get_language_context(session_id: str):
"""Get language context for a session."""
try:
# Validate session ownership
session_manager, _, language_context_manager = get_services()
session = session_manager.get_session(session_id)
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
context = language_context_manager.get_context(session_id)
return jsonify({
'session_id': session_id,
'language': context.language,
'prompt_template': context.get_prompt_template(),
'syntax_highlighting': context.get_syntax_highlighting(),
'language_info': context.get_language_info(),
'updated_at': context.updated_at.isoformat()
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except LanguageContextError as e:
logger.error(f"Error getting language context: {e}")
raise APIError(f'Failed to get language context: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error getting language context: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/sessions/<session_id>/language', methods=['PUT'])
@limiter.limit("10 per minute")
@require_auth
@validate_json_request(['language'])
def update_language_context(session_id: str):
"""Update language context for a session."""
try:
data = request.json_data
language = data['language']
# Validate language
if not LanguageContext.is_supported_language(language):
supported = LanguageContext.get_supported_languages()
raise APIError(f'Unsupported language: {language}. Supported: {", ".join(supported)}', 400)
# Validate session ownership
session_manager, _, language_context_manager = get_services()
session = session_manager.get_session(session_id)
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
# Update language context
context = language_context_manager.set_language(session_id, language)
# Update session language
session_manager.set_session_language(session_id, language)
logger.info(f"Updated language to {language} for session {session_id}")
return jsonify({
'session_id': session_id,
'language': context.language,
'prompt_template': context.get_prompt_template(),
'syntax_highlighting': context.get_syntax_highlighting(),
'language_info': context.get_language_info(),
'updated_at': context.updated_at.isoformat()
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except (SessionManagerError, LanguageContextError) as e:
logger.error(f"Error updating language context: {e}")
raise APIError(f'Failed to update language context: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error updating language context: {e}")
raise APIError('Internal server error', 500)
@chat_bp.route('/languages', methods=['GET'])
@limiter.limit("50 per minute")
def get_supported_languages():
"""Get list of supported programming languages."""
try:
languages = LanguageContext.get_supported_languages()
language_names = LanguageContext.get_language_display_names()
language_list = []
for lang_code in languages:
lang_info = LanguageContext.SUPPORTED_LANGUAGES[lang_code]
language_list.append({
'code': lang_code,
'name': lang_info['name'],
'syntax_highlighting': lang_info['syntax_highlighting'],
'file_extensions': lang_info['file_extensions']
})
return jsonify({
'languages': language_list,
'default_language': 'python',
'total_count': len(language_list)
})
except Exception as e:
logger.error(f"Unexpected error getting supported languages: {e}")
raise APIError('Internal server error', 500)
# Message Processing Endpoint
@chat_bp.route('/sessions/<session_id>/message', methods=['POST'])
@limiter.limit("30 per minute")
@require_auth
@validate_json_request(['content'])
def send_message(session_id: str):
"""Send a message to the chat agent and get a response."""
try:
data = request.json_data
content = data['content'].strip()
language = data.get('language') # Optional language override
if not content:
raise APIError('Message content cannot be empty', 400)
if len(content) > 5000: # Reasonable message length limit
raise APIError('Message too long (max 5000 characters)', 400)
# Get services
session_manager, chat_history_manager, language_context_manager = get_services()
# Validate session ownership
session = session_manager.get_session(session_id)
if session.user_id != request.user_id:
raise APIError('Access denied', 403)
# Initialize chat agent
from ..services.groq_client import GroqClient
from ..services.chat_agent import ChatAgent
from ..services.programming_assistance import ProgrammingAssistanceService
groq_client = GroqClient()
programming_assistance_service = ProgrammingAssistanceService()
chat_agent = ChatAgent(
groq_client=groq_client,
language_context_manager=language_context_manager,
session_manager=session_manager,
chat_history_manager=chat_history_manager,
programming_assistance_service=programming_assistance_service
)
# Process the message
result = chat_agent.process_message(session_id, content, language)
logger.info(f"Processed message for session {session_id}, response length: {len(result['response'])}")
return jsonify({
'response': result['response'],
'message_id': result['message_id'],
'session_id': session_id,
'language': result['language'],
'processing_time': result['processing_time'],
'timestamp': result['timestamp']
})
except SessionNotFoundError:
raise APIError('Session not found', 404)
except SessionExpiredError:
raise APIError('Session has expired', 410)
except ChatAgentError as e:
logger.error(f"Chat agent error: {e}")
raise APIError(f'Failed to process message: {str(e)}', 500)
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error processing message: {e}")
raise APIError('Internal server error', 500)
# Health Check Endpoint
@chat_bp.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring."""
try:
# Check database connection
from sqlalchemy import text
db.session.execute(text('SELECT 1'))
# Check Redis connection (if configured)
redis_status = "disabled"
redis_url = current_app.config.get('REDIS_URL')
if redis_url and redis_url != 'None':
try:
redis_client = redis.from_url(redis_url)
redis_client.ping()
redis_status = "connected"
except Exception:
redis_status = "disconnected"
else:
redis_status = "disabled"
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'services': {
'database': 'connected',
'redis': redis_status
}
})
except Exception as e:
logger.error(f"Health check failed: {e}")
return jsonify({
'status': 'unhealthy',
'timestamp': datetime.utcnow().isoformat(),
'error': str(e)
}), 503
# Error handlers
@chat_bp.errorhandler(APIError)
def handle_api_error_handler(error):
"""Handle APIError exceptions."""
return handle_api_error(error)
@chat_bp.errorhandler(400)
def handle_bad_request(error):
"""Handle bad request errors."""
return jsonify({'error': 'Bad request'}), 400
@chat_bp.errorhandler(404)
def handle_not_found(error):
"""Handle not found errors."""
return jsonify({'error': 'Not found'}), 404
@chat_bp.errorhandler(429)
def handle_rate_limit_exceeded(error):
"""Handle rate limit exceeded errors."""
return jsonify({
'error': 'Rate limit exceeded',
'message': 'Too many requests. Please try again later.'
}), 429
@chat_bp.errorhandler(500)
def handle_internal_error(error):
"""Handle internal server errors."""
logger.error(f"Internal server error: {error}")
return jsonify({'error': 'Internal server error'}), 500