local str = require("string") local tbl = require("table") local postgres = {} local Connection = {} Connection.__index = Connection function Connection:close() if self._id then local ok = moonshark.sql_close(self._id) self._id = nil return ok end return false end function Connection:ping() if not self._id then error("Connection is closed") end return moonshark.sql_ping(self._id) end function Connection:query(query_str, ...) if not self._id then error("Connection is closed") end query_str = str.normalize_whitespace(query_str) return moonshark.sql_query(self._id, query_str, ...) end function Connection:exec(query_str, ...) if not self._id then error("Connection is closed") end query_str = str.normalize_whitespace(query_str) return moonshark.sql_exec(self._id, query_str, ...) end function Connection:query_row(query_str, ...) local results = self:query(query_str, ...) if results and #results > 0 then return results[1] end return nil end function Connection:query_value(query_str, ...) local row = self:query_row(query_str, ...) if row then for _, value in pairs(row) do return value end end return nil end -- Enhanced transaction support with savepoints function Connection:begin() local result = self:exec("BEGIN") if result then return { conn = self, active = true, commit = function(tx) if tx.active then local result = tx.conn:exec("COMMIT") tx.active = false return result end return false end, rollback = function(tx) if tx.active then local result = tx.conn:exec("ROLLBACK") tx.active = false return result end return false end, savepoint = function(tx, name) if not tx.active then error("Transaction is not active") end if str.is_blank(name) then error("Savepoint name cannot be empty") end return tx.conn:exec(str.template("SAVEPOINT ${name}", {name = name})) end, rollback_to = function(tx, name) if not tx.active then error("Transaction is not active") end if str.is_blank(name) then error("Savepoint name cannot be empty") end return tx.conn:exec(str.template("ROLLBACK TO SAVEPOINT ${name}", {name = name})) end, query = function(tx, query_str, ...) if not tx.active then error("Transaction is not active") end return tx.conn:query(query_str, ...) end, exec = function(tx, query_str, ...) if not tx.active then error("Transaction is not active") end return tx.conn:exec(query_str, ...) end, query_row = function(tx, query_str, ...) if not tx.active then error("Transaction is not active") end return tx.conn:query_row(query_str, ...) end, query_value = function(tx, query_str, ...) if not tx.active then error("Transaction is not active") end return tx.conn:query_value(query_str, ...) end } end return nil end -- Simplified PostgreSQL parameter builder local function build_postgres_params(data) local keys = tbl.keys(data) local values = tbl.values(data) local placeholders = {} for i = 1, #keys do tbl.insert(placeholders, str.template("$${num}", {num = tostring(i)})) end return keys, values, placeholders, #keys end -- Simplified query builders using table utilities function Connection:insert(table_name, data, returning) if str.is_blank(table_name) then error("Table name cannot be empty") end local keys, values, placeholders = build_postgres_params(data) local query = str.template("INSERT INTO ${table} (${columns}) VALUES (${placeholders})", { table = table_name, columns = tbl.concat(keys, ", "), placeholders = tbl.concat(placeholders, ", ") }) if returning and not str.is_blank(returning) then query = str.template("${query} RETURNING ${returning}", { query = query, returning = returning }) return self:query(query, unpack(values)) else return self:exec(query, unpack(values)) end end function Connection:upsert(table_name, data, conflict_columns, returning) if str.is_blank(table_name) then error("Table name cannot be empty") end local keys, values, placeholders = build_postgres_params(data) local updates = tbl.map(keys, function(key) return str.template("${key} = EXCLUDED.${key}", {key = key}) end) local conflict_clause = "" if conflict_columns then if type(conflict_columns) == "string" then conflict_clause = str.template("(${columns})", {columns = conflict_columns}) else conflict_clause = str.template("(${columns})", {columns = tbl.concat(conflict_columns, ", ")}) end end local query = str.template("INSERT INTO ${table} (${columns}) VALUES (${placeholders}) ON CONFLICT ${conflict} DO UPDATE SET ${updates}", { table = table_name, columns = tbl.concat(keys, ", "), placeholders = tbl.concat(placeholders, ", "), conflict = conflict_clause, updates = tbl.concat(updates, ", ") }) if returning and not str.is_blank(returning) then query = str.template("${query} RETURNING ${returning}", { query = query, returning = returning }) return self:query(query, unpack(values)) else return self:exec(query, unpack(values)) end end function Connection:update(table_name, data, where_clause, returning, ...) if str.is_blank(table_name) then error("Table name cannot be empty") end if str.is_blank(where_clause) then error("WHERE clause cannot be empty for UPDATE") end local keys = tbl.keys(data) local values = tbl.values(data) local param_count = #keys local sets = {} for i, key in ipairs(keys) do tbl.insert(sets, str.template("${key} = $${num}", { key = key, num = tostring(i) })) end -- Handle WHERE clause parameters local where_args = {...} local where_clause_with_params = where_clause for i = 1, #where_args do param_count = param_count + 1 tbl.insert(values, where_args[i]) where_clause_with_params = str.replace(where_clause_with_params, "?", str.template("$${num}", {num = tostring(param_count)}), 1) end local query = str.template("UPDATE ${table} SET ${sets} WHERE ${where}", { table = table_name, sets = tbl.concat(sets, ", "), where = where_clause_with_params }) if returning and not str.is_blank(returning) then query = str.template("${query} RETURNING ${returning}", { query = query, returning = returning }) return self:query(query, unpack(values)) else return self:exec(query, unpack(values)) end end function Connection:delete(table_name, where_clause, returning, ...) if str.is_blank(table_name) then error("Table name cannot be empty") end if str.is_blank(where_clause) then error("WHERE clause cannot be empty for DELETE") end -- Handle WHERE clause parameters local where_args = {...} local values = {} local where_clause_with_params = where_clause for i = 1, #where_args do tbl.insert(values, where_args[i]) where_clause_with_params = str.replace(where_clause_with_params, "?", str.template("$${num}", {num = tostring(i)}), 1) end local query = str.template("DELETE FROM ${table} WHERE ${where}", { table = table_name, where = where_clause_with_params }) if returning and not str.is_blank(returning) then query = str.template("${query} RETURNING ${returning}", { query = query, returning = returning }) return self:query(query, unpack(values)) else return self:exec(query, unpack(values)) end end function Connection:select(table_name, columns, where_clause, ...) if str.is_blank(table_name) then error("Table name cannot be empty") end columns = columns or "*" if type(columns) == "table" then columns = tbl.concat(columns, ", ") end local query if where_clause and not str.is_blank(where_clause) then -- Handle WHERE clause parameters local where_args = {...} local values = {} local where_clause_with_params = where_clause for i = 1, #where_args do tbl.insert(values, where_args[i]) where_clause_with_params = str.replace(where_clause_with_params, "?", str.template("$${num}", {num = tostring(i)}), 1) end query = str.template("SELECT ${columns} FROM ${table} WHERE ${where}", { columns = columns, table = table_name, where = where_clause_with_params }) return self:query(query, unpack(values)) else query = str.template("SELECT ${columns} FROM ${table}", { columns = columns, table = table_name }) return self:query(query) end end -- Enhanced PostgreSQL schema helpers function Connection:table_exists(table_name, schema_name) if str.is_blank(table_name) then return false end schema_name = schema_name or "public" local result = self:query_value( "SELECT tablename FROM pg_tables WHERE schemaname = $1 AND tablename = $2", str.trim(schema_name), str.trim(table_name) ) return result ~= nil end function Connection:column_exists(table_name, column_name, schema_name) if str.is_blank(table_name) or str.is_blank(column_name) then return false end schema_name = schema_name or "public" local result = self:query_value([[ SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3 ]], str.trim(schema_name), str.trim(table_name), str.trim(column_name)) return result ~= nil end function Connection:create_table(table_name, schema) if str.is_blank(table_name) or str.is_blank(schema) then error("Table name and schema cannot be empty") end local query = str.template("CREATE TABLE IF NOT EXISTS ${table} (${schema})", { table = table_name, schema = str.trim(schema) }) return self:exec(query) end function Connection:drop_table(table_name, cascade) if str.is_blank(table_name) then error("Table name cannot be empty") end local cascade_clause = cascade and " CASCADE" or "" local query = str.template("DROP TABLE IF EXISTS ${table}${cascade}", { table = table_name, cascade = cascade_clause }) return self:exec(query) end function Connection:add_column(table_name, column_def) if str.is_blank(table_name) or str.is_blank(column_def) then error("Table name and column definition cannot be empty") end local query = str.template("ALTER TABLE ${table} ADD COLUMN IF NOT EXISTS ${column}", { table = table_name, column = str.trim(column_def) }) return self:exec(query) end function Connection:drop_column(table_name, column_name, cascade) if str.is_blank(table_name) or str.is_blank(column_name) then error("Table name and column name cannot be empty") end local cascade_clause = cascade and " CASCADE" or "" local query = str.template("ALTER TABLE ${table} DROP COLUMN IF EXISTS ${column}${cascade}", { table = table_name, column = column_name, cascade = cascade_clause }) return self:exec(query) end function Connection:create_index(index_name, table_name, columns, unique, method) if str.is_blank(index_name) or str.is_blank(table_name) then error("Index name and table name cannot be empty") end local unique_clause = unique and "UNIQUE " or "" local method_clause = method and str.template(" USING ${method}", {method = str.upper(method)}) or "" local columns_str = type(columns) == "table" and tbl.concat(columns, ", ") or tostring(columns) local query = str.template("CREATE ${unique}INDEX IF NOT EXISTS ${index} ON ${table}${method} (${columns})", { unique = unique_clause, index = index_name, table = table_name, method = method_clause, columns = columns_str }) return self:exec(query) end function Connection:drop_index(index_name, cascade) if str.is_blank(index_name) then error("Index name cannot be empty") end local cascade_clause = cascade and " CASCADE" or "" local query = str.template("DROP INDEX IF EXISTS ${index}${cascade}", { index = index_name, cascade = cascade_clause }) return self:exec(query) end -- PostgreSQL-specific functions function Connection:vacuum(table_name, analyze) local analyze_clause = analyze and " ANALYZE" or "" local table_clause = table_name and str.template(" ${table}", {table = table_name}) or "" return self:exec(str.template("VACUUM${analyze}${table}", { analyze = analyze_clause, table = table_clause })) end function Connection:analyze(table_name) local table_clause = table_name and str.template(" ${table}", {table = table_name}) or "" return self:exec(str.template("ANALYZE${table}", {table = table_clause})) end function Connection:reindex(name, type) if str.is_blank(name) then error("Name cannot be empty for REINDEX") end type = type or "INDEX" local valid_types = {"INDEX", "TABLE", "SCHEMA", "DATABASE", "SYSTEM"} local type_upper = str.upper(type) if not tbl.contains(valid_types, type_upper) then error(str.template("Invalid REINDEX type: ${type}", {type = type})) end return self:exec(str.template("REINDEX ${type} ${name}", { type = type_upper, name = name })) end -- PostgreSQL settings and introspection function Connection:show(setting) if str.is_blank(setting) then error("Setting name cannot be empty") end return self:query_value(str.template("SHOW ${setting}", {setting = setting})) end function Connection:set(setting, value) if str.is_blank(setting) then error("Setting name cannot be empty") end return self:exec(str.template("SET ${setting} = ${value}", { setting = setting, value = tostring(value) })) end function Connection:current_database() return self:query_value("SELECT current_database()") end function Connection:current_schema() return self:query_value("SELECT current_schema()") end function Connection:version() return self:query_value("SELECT version()") end function Connection:list_schemas() return self:query("SELECT schema_name FROM information_schema.schemata ORDER BY schema_name") end function Connection:list_tables(schema_name) schema_name = schema_name or "public" return self:query("SELECT tablename FROM pg_tables WHERE schemaname = $1 ORDER BY tablename", str.trim(schema_name)) end function Connection:describe_table(table_name, schema_name) if str.is_blank(table_name) then error("Table name cannot be empty") end schema_name = schema_name or "public" return self:query([[ SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position ]], str.trim(schema_name), str.trim(table_name)) end -- JSON/JSONB helpers function Connection:json_extract(column, path) if str.is_blank(column) or str.is_blank(path) then error("Column and path cannot be empty") end return str.template("${column}->'${path}'", {column = column, path = path}) end function Connection:json_extract_text(column, path) if str.is_blank(column) or str.is_blank(path) then error("Column and path cannot be empty") end return str.template("${column}->>'${path}'", {column = column, path = path}) end function Connection:jsonb_contains(column, value) if str.is_blank(column) or str.is_blank(value) then error("Column and value cannot be empty") end return str.template("${column} @> '${value}'", {column = column, value = value}) end function Connection:jsonb_contained_by(column, value) if str.is_blank(column) or str.is_blank(value) then error("Column and value cannot be empty") end return str.template("${column} <@ '${value}'", {column = column, value = value}) end -- Array helpers function Connection:array_contains(column, value) if str.is_blank(column) then error("Column cannot be empty") end return str.template("$1 = ANY(${column})", {column = column}) end function Connection:array_length(column) if str.is_blank(column) then error("Column cannot be empty") end return str.template("array_length(${column}, 1)", {column = column}) end -- Connection management function postgres.parse_dsn(dsn) if str.is_blank(dsn) then return nil, "DSN cannot be empty" end local parts = {} for pair in str.trim(dsn):gmatch("[^%s]+") do local key, value = pair:match("([^=]+)=(.+)") if key and value then parts[str.trim(key)] = str.trim(value) end end return parts end function postgres.connect(dsn) if str.is_blank(dsn) then error("DSN cannot be empty") end local conn_id = moonshark.sql_connect("postgres", str.trim(dsn)) if conn_id then local conn = {_id = conn_id} setmetatable(conn, Connection) return conn end return nil end postgres.open = postgres.connect -- Quick execution functions function postgres.query(dsn, query_str, ...) local conn = postgres.connect(dsn) if not conn then error("Failed to connect to PostgreSQL database") end local results = conn:query(query_str, ...) conn:close() return results end function postgres.exec(dsn, query_str, ...) local conn = postgres.connect(dsn) if not conn then error("Failed to connect to PostgreSQL database") end local result = conn:exec(query_str, ...) conn:close() return result end function postgres.query_row(dsn, query_str, ...) local results = postgres.query(dsn, query_str, ...) if results and #results > 0 then return results[1] end return nil end function postgres.query_value(dsn, query_str, ...) local row = postgres.query_row(dsn, query_str, ...) if row then for _, value in pairs(row) do return value end end return nil end -- Migration helpers function postgres.migrate(dsn, migrations, schema) schema = schema or "public" local conn = postgres.connect(dsn) if not conn then error("Failed to connect to PostgreSQL database for migration") end conn:create_table("_migrations", "id SERIAL PRIMARY KEY, name TEXT UNIQUE NOT NULL, applied_at TIMESTAMPTZ DEFAULT NOW()") local tx = conn:begin() if not tx then conn:close() error("Failed to begin migration transaction") end local success = true local error_msg = "" for _, migration in ipairs(migrations) do if not migration.name or str.is_blank(migration.name) then error_msg = "Migration must have a non-empty name" success = false break end local existing = conn:query_value("SELECT id FROM _migrations WHERE name = $1", str.trim(migration.name)) if not existing then local ok, err = pcall(function() if type(migration.up) == "string" then conn:exec(migration.up) elseif type(migration.up) == "function" then migration.up(conn) else error("Migration 'up' must be string or function") end end) if ok then conn:exec("INSERT INTO _migrations (name) VALUES ($1)", str.trim(migration.name)) print(str.template("Applied migration: ${name}", {name = migration.name})) else success = false error_msg = str.template("Migration '${name}' failed: ${error}", { name = migration.name, error = err or "unknown error" }) break end end end if success then tx:commit() else tx:rollback() conn:close() error(error_msg) end conn:close() return true end -- Simplified result processing utilities function postgres.to_array(results, column_name) if not results or tbl.is_empty(results) then return {} end if str.is_blank(column_name) then error("Column name cannot be empty") end return tbl.map(results, function(row) return row[column_name] end) end function postgres.to_map(results, key_column, value_column) if not results or tbl.is_empty(results) then return {} end if str.is_blank(key_column) then error("Key column name cannot be empty") end local map = {} for _, row in ipairs(results) do local key = row[key_column] map[key] = value_column and row[value_column] or row end return map end function postgres.group_by(results, column_name) if not results or tbl.is_empty(results) then return {} end if str.is_blank(column_name) then error("Column name cannot be empty") end return tbl.group_by(results, function(row) return row[column_name] end) end -- Simplified debug helper function postgres.print_results(results) if not results or tbl.is_empty(results) then print("No results") return end local columns = tbl.keys(results[1]) tbl.sort(columns) -- Calculate column widths local widths = {} for _, col in ipairs(columns) do widths[col] = str.length(col) end for _, row in ipairs(results) do for _, col in ipairs(columns) do local value = tostring(row[col] or "") widths[col] = math.max(widths[col], str.length(value)) end end -- Print header local header_parts = tbl.map(columns, function(col) return str.pad_right(col, widths[col]) end) local separator_parts = tbl.map(columns, function(col) return str.repeat_("-", widths[col]) end) print(tbl.concat(header_parts, " | ")) print(tbl.concat(separator_parts, "-+-")) -- Print rows for _, row in ipairs(results) do local value_parts = tbl.map(columns, function(col) local value = tostring(row[col] or "") return str.pad_right(value, widths[col]) end) print(tbl.concat(value_parts, " | ")) end end function postgres.escape_identifier(name) if str.is_blank(name) then error("Identifier name cannot be empty") end return str.template('"${name}"', {name = str.replace(name, '"', '""')}) end function postgres.escape_literal(value) if type(value) == "string" then return str.template("'${value}'", {value = str.replace(value, "'", "''")}) end return tostring(value) end return postgres