import platform import os import sqlite3 import uuid import datetime import shutil import traceback import logging from pathlib import Path from abc import ABC, abstractmethod from typing import Dict, Any, List import gradio as gr import pandas as pd # --- Base Classes --- class Interface(ABC): @abstractmethod def launch(self): pass class Command(ABC): @abstractmethod def execute(self): pass # --- Database Manager Implementation --- class DatabaseManager: """Handles all database operations including creation, connection, and CRUD operations.""" def __init__(self, db_path: str = None): if db_path is None: if platform.system() == 'Windows': base_dir = os.path.join(os.environ['APPDATA'], 'FileStorageApp') elif platform.system() == 'Darwin': base_dir = os.path.join(os.path.expanduser('~'), 'Library', 'Application Support', 'FileStorageApp') else: base_dir = os.path.join(os.path.expanduser('~'), '.filestorage') os.makedirs(base_dir, exist_ok=True) self.db_path = os.path.join(base_dir, 'file_storage.db') else: self.db_path = db_path self.conn = None self.cursor = None self.connect() self.create_tables() def connect(self) -> None: """Establish a connection to the SQLite database.""" try: self.conn = sqlite3.connect(self.db_path) self.conn.execute("PRAGMA foreign_keys = ON") self.cursor = self.conn.cursor() except sqlite3.Error as e: logging.error(f"Database connection error: {e}") raise def create_tables(self) -> None: """Create necessary tables if they don't exist.""" tables = [ '''CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, original_filename TEXT NOT NULL, file_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_type TEXT, upload_date DATETIME DEFAULT CURRENT_TIMESTAMP )''', '''CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, key TEXT NOT NULL, value TEXT, FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE )''', '''CREATE TABLE IF NOT EXISTS chunks ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, chunk_index INTEGER NOT NULL, chunk_text TEXT NOT NULL, chunk_size INTEGER NOT NULL, FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE )''' ] try: for table in tables: self.cursor.execute(table) self.conn.commit() except sqlite3.Error as e: self.conn.rollback() logging.error(f"Error creating tables: {e}") raise def insert_file(self, file_data: Dict[str, Any]) -> int: """Insert file information into the database.""" try: self.cursor.execute(''' INSERT INTO files (filename, original_filename, file_path, file_size, file_type) VALUES (?, ?, ?, ?, ?) ''', (file_data['filename'], file_data['original_filename'], file_data['file_path'], file_data['file_size'], file_data['file_type'])) self.conn.commit() return self.cursor.lastrowid except sqlite3.Error as e: self.conn.rollback() logging.error(f"Error inserting file: {e}") raise def insert_metadata(self, file_id: int, metadata: Dict[str, str]) -> None: """Insert metadata for a specific file.""" try: for key, value in metadata.items(): self.cursor.execute(''' INSERT INTO metadata (file_id, key, value) VALUES (?, ?, ?) ''', (file_id, key, value)) self.conn.commit() except sqlite3.Error as e: self.conn.rollback() logging.error(f"Error inserting metadata: {e}") raise def insert_chunk(self, file_id: int, chunk_index: int, chunk_text: str) -> None: """Insert a text chunk into the database.""" try: chunk_size = len(chunk_text.split()) self.cursor.execute(''' INSERT INTO chunks (file_id, chunk_index, chunk_text, chunk_size) VALUES (?, ?, ?, ?) ''', (file_id, chunk_index, chunk_text, chunk_size)) self.conn.commit() except sqlite3.Error as e: self.conn.rollback() logging.error(f"Error inserting chunk: {e}") raise def log_error(self, error_data: Dict[str, str]) -> None: """Log errors to the database.""" try: self.cursor.execute(''' INSERT INTO metadata (file_id, key, value) VALUES (?, ?, ?) ''', (-1, 'error', str(error_data))) self.conn.commit() except sqlite3.Error as e: logging.error(f"Error logging error: {e}") def close(self) -> None: """Close the database connection.""" if self.conn: self.conn.close() # --- File Processor Implementation --- class FileProcessor: """Handles file uploads, storage, and metadata extraction.""" def __init__(self, upload_folder: str = None): self.upload_folder = upload_folder or os.path.join(Path.home(), 'FileUploads') os.makedirs(self.upload_folder, exist_ok=True) def save_file(self, file: Any) -> Dict[str, Any]: """Save the uploaded file and extract metadata.""" filename = f"{uuid.uuid4()}_{file.name}" file_path = os.path.join(self.upload_folder, filename) try: with open(file_path, "wb") as f: f.write(file.read()) return { 'filename': filename, 'original_filename': file.name, 'file_path': file_path, 'file_size': os.path.getsize(file_path), 'file_type': file.name.split('.')[-1] if '.' in file.name else 'unknown' } except Exception as e: logging.error(f"Error saving file: {e}") raise def extract_content(self, file_path: str) -> str: """Extract text content from a file.""" try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logging.error(f"Error extracting content: {e}") raise # --- Text Chunker Implementation --- class TextChunker: """Splits text content into manageable chunks.""" def __init__(self, chunk_size: int = 500, overlap: int = 50): self.chunk_size = chunk_size self.overlap = overlap def chunk_text(self, text: str) -> List[str]: """Split text into chunks with overlap.""" words = text.split() chunks = [] start = 0 while start < len(words): end = start + self.chunk_size chunks.append(' '.join(words[start:end])) start = end - self.overlap return chunks # --- Command Handler Implementation --- class CommandHandler: """Manages command execution.""" def __init__(self): self.commands = {} def register_command(self, name: str, command: Command): self.commands[name] = command def execute_command(self, name: str) -> bool: if name in self.commands: self.commands[name].execute() return True logging.warning(f"Command '{name}' not found.") return False # --- Main Application Implementation --- class Application(Interface): """Core application class.""" def __init__(self): self.db_manager = DatabaseManager() self.file_processor = FileProcessor() self.text_chunker = TextChunker(chunk_size=512, overlap=50) self.command_handler = CommandHandler() self.processed_data = None def run(self, uploaded_file: Any) -> None: """Main processing pipeline.""" try: if not uploaded_file: raise ValueError("No file provided for processing") # Process file file_info = self.file_processor.save_file(uploaded_file) file_id = self.db_manager.insert_file(file_info) # Extract and chunk content raw_content = self.file_processor.extract_content(file_info['file_path']) chunks = self.text_chunker.chunk_text(raw_content) # Store chunks and metadata self.db_manager.insert_metadata(file_id, { 'source': 'upload', 'processed_at': datetime.datetime.now().isoformat() }) for idx, chunk in enumerate(chunks): self.db_manager.insert_chunk(file_id, idx+1, chunk) self.processed_data = { 'filename': uploaded_file.name, 'chunk_count': len(chunks), 'status': 'processed' } except Exception as e: self._handle_error(e) raise def _handle_error(self, error: Exception) -> None: """Centralized error handling.""" error_data = { 'timestamp': datetime.datetime.now().isoformat(), 'error_type': type(error).__name__, 'message': str(error), 'stack_trace': traceback.format_exc() } self.db_manager.log_error(error_data) self.processed_data = {'status': 'failed'} # --- Gradio Interface Implementation --- class DataDeityInterface: def __init__(self, app): self.app = app self._setup_theme() def _setup_theme(self): self.theme = gr.themes.Default( primary_hue="emerald", secondary_hue="teal", font=[gr.themes.GoogleFont("Fira Code"), "Arial", "sans-serif"] ) def _file_upload_tab(self): with gr.Tab("๐Ÿ“ค Upload & Process"): with gr.Row(): file_input = gr.File(label="Drag files here", file_count="multiple") stats_output = gr.JSON(label="Processing Stats") with gr.Row(): process_btn = gr.Button("โšก Process Files", variant="primary") clear_btn = gr.Button("๐Ÿงน Clear Cache") download_dataset_btn = gr.Button("โฌ‡๏ธ Download LLM Dataset (.jsonl)", variant="secondary") file_output = gr.Dataframe(label="File Contents Preview") dataset_output = gr.File(label="Download Generated Dataset") process_btn.click( self.process_file, inputs=file_input, outputs=[stats_output, file_output] ) clear_btn.click(lambda: None, outputs=[file_input, stats_output, file_output]) download_dataset_btn.click( self.generate_dataset, outputs=[dataset_output] ) return file_input def generate_dataset(self): try: chunks = self.app.db_manager.cursor.execute("SELECT chunk_text FROM chunks").fetchall() if not chunks: return None import time, os, json, tempfile dataset_filename = f"bulk_dataset_{int(time.time())}.jsonl" dataset_path = os.path.join(tempfile.gettempdir(), dataset_filename) with open(dataset_path, 'w', encoding='utf-8') as f: for chunk in chunks: entry = {"text": chunk[0]} f.write(json.dumps(entry) + '\n') return dataset_path except Exception as e: logging.error(f"Error generating dataset: {e}") return None def _data_explorer_tab(self): with gr.Tab("๐Ÿ” Data Explorer"): with gr.Row(): refresh_btn = gr.Button("๐Ÿ”„ Refresh Data", variant="secondary") search_bar = gr.Textbox(placeholder="Search across all data...") with gr.Tabs(): with gr.Tab("Database View"): files_table = gr.Dataframe(label="Stored Files") metadata_table = gr.Dataframe(label="File Metadata") chunks_table = gr.Dataframe(label="Text Chunks") with gr.Tab("Analytics View"): stats_plot = gr.Plot(label="Data Distribution") correlations = gr.Matrix(label="Data Correlations") refresh_btn.click( self.refresh_data, outputs=[files_table, metadata_table, chunks_table] ) def _command_interface_tab(self): with gr.Tab("๐Ÿ’ป Command Console"): cmd_input = gr.Textbox( placeholder="Enter data command...", lines=3, max_lines=10 ) with gr.Row(): execute_btn = gr.Button("๐Ÿš€ Execute", variant="primary") cmd_history_btn = gr.Button("๐Ÿ•’ History") cmd_output = gr.JSON(label="Command Results") cmd_explain = gr.Markdown("### Command Explanation") execute_btn.click( self.execute_command, inputs=cmd_input, outputs=[cmd_output, cmd_explain] ) def create_interface(self): with gr.Blocks(theme=self.theme, title="Data Deity") as interface: gr.Markdown("# ๐Ÿง  Data Deity - Ultimate Data Omnipotence Interface") with gr.Tabs(): file_input = self._file_upload_tab() self._data_explorer_tab() self._command_interface_tab() return interface def process_file(self, files): try: processed_files = [] for file in files: self.app.run(file) processed_files.append({ "filename": file.name, "chunks": self.app.processed_data['chunk_count'], "status": "processed", "timestamp": datetime.datetime.now().isoformat() }) stats = { "total_files": len(processed_files), "total_chunks": sum(f['chunks'] for f in processed_files), "average_size": f"{sum(f.size for f in files)/1024/1024:.2f}MB" } preview = pd.DataFrame({ "File": [f.name for f in files], "Type": [f.name.split('.')[-1] for f in files], "Status": ["โœ… Processed"]*len(files) }) return stats, preview except Exception as e: return {"error": str(e)}, pd.DataFrame() def refresh_data(self): try: files = self.app.db_manager.cursor.execute("SELECT * FROM files").fetchall() metadata = self.app.db_manager.cursor.execute("SELECT * FROM metadata").fetchall() chunks = self.app.db_manager.cursor.execute("SELECT * FROM chunks").fetchall() files_df = pd.DataFrame(files, columns=["ID", "Filename", "Original", "Path", "Size", "Type", "Uploaded"]) metadata_df = pd.DataFrame(metadata, columns=["ID", "File ID", "Key", "Value"]) chunks_df = pd.DataFrame(chunks, columns=["ID", "File ID", "Index", "Text", "Size"]) return files_df, metadata_df, chunks_df except Exception as e: return pd.DataFrame(), pd.DataFrame(), pd.DataFrame() def execute_command(self, command): try: if "list files" in command.lower(): files = self.app.db_manager.cursor.execute("SELECT filename, file_type, upload_date FROM files").fetchall() return {"result": files}, "### File Listing Command\nRetrieved all stored files from database." elif "search" in command.lower(): term = command.split("search")[1].strip() results = self.app.db_manager.cursor.execute( "SELECT chunk_text FROM chunks WHERE chunk_text LIKE ?", (f"%{term}%",) ).fetchall() return {"matches": [r[0] for r in results]}, f"### Search Results\nFound {len(results)} matches for '{term}'" else: return {"error": "Command not recognized"}, "### Unrecognized Command\nTry 'list files' or 'search '" except Exception as e: return {"error": str(e)}, "### Command Execution Failed" # --- Main Execution --- if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) try: app = Application() interface = DataDeityInterface(app) interface.create_interface().launch( server_name="0.0.0.0", server_port=7860, share=True ) except KeyboardInterrupt: logging.info("\nApplication shutdown requested") finally: app.db_manager.close()