Moonshark/modules/postgres/postgres.lua

689 lines
19 KiB
Lua

local postgres = {}
local Connection = {}
Connection.__index = Connection
function Connection:close()
if self._id then
local ok = moonshark.sql_close(self._id)
self._id = nil
return ok
end
return false
end
function Connection:ping()
if not self._id then
error("Connection is closed")
end
return moonshark.sql_ping(self._id)
end
function Connection:query(query_str, ...)
if not self._id then
error("Connection is closed")
end
return moonshark.sql_query(self._id, query_str:normalize_whitespace(), ...)
end
function Connection:exec(query_str, ...)
if not self._id then
error("Connection is closed")
end
return moonshark.sql_exec(self._id, query_str:normalize_whitespace(), ...)
end
function Connection:query_row(query_str, ...)
local results = self:query(query_str, ...)
return results and #results > 0 and results[1] or nil
end
function Connection:query_value(query_str, ...)
local row = self:query_row(query_str, ...)
if row then
for _, value in pairs(row) do
return value
end
end
return nil
end
function Connection:begin()
local result = self:exec("BEGIN")
if result then
return {
conn = self,
active = true,
commit = function(tx)
if tx.active then
tx.active = false
return tx.conn:exec("COMMIT")
end
return false
end,
rollback = function(tx)
if tx.active then
tx.active = false
return tx.conn:exec("ROLLBACK")
end
return false
end,
savepoint = function(tx, name)
if not tx.active then error("Transaction is not active") end
if name:is_blank() then error("Savepoint name cannot be empty") end
return tx.conn:exec("SAVEPOINT {{name}}":parse({name = name}))
end,
rollback_to = function(tx, name)
if not tx.active then error("Transaction is not active") end
if name:is_blank() then error("Savepoint name cannot be empty") end
return tx.conn:exec("ROLLBACK TO SAVEPOINT {{name}}":parse({name = name}))
end,
query = function(tx, query_str, ...)
if not tx.active then error("Transaction is not active") end
return tx.conn:query(query_str, ...)
end,
exec = function(tx, query_str, ...)
if not tx.active then error("Transaction is not active") end
return tx.conn:exec(query_str, ...)
end,
query_row = function(tx, query_str, ...)
if not tx.active then error("Transaction is not active") end
return tx.conn:query_row(query_str, ...)
end,
query_value = function(tx, query_str, ...)
if not tx.active then error("Transaction is not active") end
return tx.conn:query_value(query_str, ...)
end
}
end
return nil
end
-- Build PostgreSQL parameters ($1, $2, etc.)
local function build_postgres_params(data)
local keys = table.keys(data)
local values = table.values(data)
local placeholders = {}
for i = 1, #keys do
placeholders[i] = "$" .. i
end
return keys, values, placeholders
end
function Connection:insert(table_name, data, returning)
if table_name:is_blank() then
error("Table name cannot be empty")
end
local keys, values, placeholders = build_postgres_params(data)
local query = "INSERT INTO {{table}} ({{columns}}) VALUES ({{placeholders}})":parse({
table = table_name,
columns = keys:join(", "),
placeholders = table.concat(placeholders, ", ")
})
if returning and not returning:is_blank() then
query = query .. " RETURNING " .. returning
return self:query(query, unpack(values))
else
return self:exec(query, unpack(values))
end
end
function Connection:upsert(table_name, data, conflict_columns, returning)
if table_name:is_blank() then
error("Table name cannot be empty")
end
local keys, values, placeholders = build_postgres_params(data)
local updates = table.map(keys, function(key) return key .. " = EXCLUDED." .. key end)
local conflict_clause = ""
if conflict_columns then
if type(conflict_columns) == "string" then
conflict_clause = "(" .. conflict_columns .. ")"
else
conflict_clause = "(" .. table.concat(conflict_columns, ", ") .. ")"
end
end
local query = "INSERT INTO {{table}} ({{columns}}) VALUES ({{placeholders}}) ON CONFLICT {{conflict}} DO UPDATE SET {{updates}}":parse({
table = table_name,
columns = keys:join(", "),
placeholders = table.concat(placeholders, ", "),
conflict = conflict_clause,
updates = updates:join(", ")
})
if returning and not returning:is_blank() then
query = query .. " RETURNING " .. returning
return self:query(query, unpack(values))
else
return self:exec(query, unpack(values))
end
end
function Connection:update(table_name, data, where_clause, returning, ...)
if table_name:is_blank() then
error("Table name cannot be empty")
end
if where_clause:is_blank() then
error("WHERE clause cannot be empty for UPDATE")
end
local keys = table.keys(data)
local values = table.values(data)
local param_count = #keys
-- Build SET clause with numbered parameters
local sets = {}
for i, key in ipairs(keys) do
sets[i] = key .. " = $" .. i
end
-- Handle WHERE parameters
local where_args = {...}
local where_clause_final = where_clause
for i = 1, #where_args do
param_count = param_count + 1
values[#values + 1] = where_args[i]
where_clause_final = where_clause_final:replace("?", "$" .. param_count, 1)
end
local query = "UPDATE {{table}} SET {{sets}} WHERE {{where}}":parse({
table = table_name,
sets = table.concat(sets, ", "),
where = where_clause_final
})
if returning and not returning:is_blank() then
query = query .. " RETURNING " .. returning
return self:query(query, unpack(values))
else
return self:exec(query, unpack(values))
end
end
function Connection:delete(table_name, where_clause, returning, ...)
if table_name:is_blank() then
error("Table name cannot be empty")
end
if where_clause:is_blank() then
error("WHERE clause cannot be empty for DELETE")
end
local where_args = {...}
local values = {}
local where_clause_final = where_clause
for i = 1, #where_args do
values[i] = where_args[i]
where_clause_final = where_clause_final:replace("?", "$" .. i, 1)
end
local query = "DELETE FROM {{table}} WHERE {{where}}":parse({
table = table_name,
where = where_clause_final
})
if returning and not returning:is_blank() then
query = query .. " RETURNING " .. returning
return self:query(query, unpack(values))
else
return self:exec(query, unpack(values))
end
end
function Connection:select(table_name, columns, where_clause, ...)
if table_name:is_blank() then
error("Table name cannot be empty")
end
columns = columns or "*"
if type(columns) == "table" then
columns = table.concat(columns, ", ")
end
if where_clause and not where_clause:is_blank() then
local where_args = {...}
local values = {}
local where_clause_final = where_clause
for i = 1, #where_args do
values[i] = where_args[i]
where_clause_final = where_clause_final:replace("?", "$" .. i, 1)
end
local query = "SELECT {{columns}} FROM {{table}} WHERE {{where}}":parse({
columns = columns,
table = table_name,
where = where_clause_final
})
return self:query(query, unpack(values))
else
local query = "SELECT {{columns}} FROM {{table}}":parse({
columns = columns,
table = table_name
})
return self:query(query)
end
end
-- Schema helpers
function Connection:table_exists(table_name, schema_name)
if table_name:is_blank() then return false end
schema_name = schema_name or "public"
return self:query_value("SELECT tablename FROM pg_tables WHERE schemaname = $1 AND tablename = $2",
schema_name:trim(), table_name:trim()) ~= nil
end
function Connection:column_exists(table_name, column_name, schema_name)
if table_name:is_blank() or column_name:is_blank() then return false end
schema_name = schema_name or "public"
return self:query_value([[
SELECT column_name FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2 AND column_name = $3
]], schema_name:trim(), table_name:trim(), column_name:trim()) ~= nil
end
function Connection:create_table(table_name, schema)
if table_name:is_blank() or schema:is_blank() then
error("Table name and schema cannot be empty")
end
return self:exec("CREATE TABLE IF NOT EXISTS {{table}} ({{schema}})":parse({
table = table_name,
schema = schema:trim()
}))
end
function Connection:drop_table(table_name, cascade)
if table_name:is_blank() then
error("Table name cannot be empty")
end
local cascade_clause = cascade and " CASCADE" or ""
return self:exec("DROP TABLE IF EXISTS {{table}}{{cascade}}":parse({
table = table_name,
cascade = cascade_clause
}))
end
function Connection:add_column(table_name, column_def)
if table_name:is_blank() or column_def:is_blank() then
error("Table name and column definition cannot be empty")
end
return self:exec("ALTER TABLE {{table}} ADD COLUMN IF NOT EXISTS {{column}}":parse({
table = table_name,
column = column_def:trim()
}))
end
function Connection:drop_column(table_name, column_name, cascade)
if table_name:is_blank() or column_name:is_blank() then
error("Table name and column name cannot be empty")
end
local cascade_clause = cascade and " CASCADE" or ""
return self:exec("ALTER TABLE {{table}} DROP COLUMN IF EXISTS {{column}}{{cascade}}":parse({
table = table_name,
column = column_name,
cascade = cascade_clause
}))
end
function Connection:create_index(index_name, table_name, columns, unique, method)
if index_name:is_blank() or table_name:is_blank() then
error("Index name and table name cannot be empty")
end
local unique_clause = unique and "UNIQUE " or ""
local method_clause = method and " USING " .. method:upper() or ""
local columns_str = type(columns) == "table" and table.concat(columns, ", ") or tostring(columns)
return self:exec("CREATE {{unique}}INDEX IF NOT EXISTS {{index}} ON {{table}}{{method}} ({{columns}})":parse({
unique = unique_clause,
index = index_name,
table = table_name,
method = method_clause,
columns = columns_str
}))
end
function Connection:drop_index(index_name, cascade)
if index_name:is_blank() then
error("Index name cannot be empty")
end
local cascade_clause = cascade and " CASCADE" or ""
return self:exec("DROP INDEX IF EXISTS {{index}}{{cascade}}":parse({
index = index_name,
cascade = cascade_clause
}))
end
-- PostgreSQL-specific functions
function Connection:vacuum(table_name, analyze)
local analyze_clause = analyze and " ANALYZE" or ""
local table_clause = table_name and " " .. table_name or ""
return self:exec("VACUUM{{analyze}}{{table}}":parse({
analyze = analyze_clause,
table = table_clause
}))
end
function Connection:analyze(table_name)
local table_clause = table_name and " " .. table_name or ""
return self:exec("ANALYZE{{table}}":parse({table = table_clause}))
end
function Connection:reindex(name, type)
if name:is_blank() then
error("Name cannot be empty for REINDEX")
end
type = (type or "INDEX"):upper()
local valid_types = {"INDEX", "TABLE", "SCHEMA", "DATABASE", "SYSTEM"}
if not table.contains(valid_types, type) then
error("Invalid REINDEX type: " .. type)
end
return self:exec("REINDEX {{type}} {{name}}":parse({type = type, name = name}))
end
function Connection:show(setting)
if setting:is_blank() then
error("Setting name cannot be empty")
end
return self:query_value("SHOW {{setting}}":parse({setting = setting}))
end
function Connection:set(setting, value)
if setting:is_blank() then
error("Setting name cannot be empty")
end
return self:exec("SET {{setting}} = {{value}}":parse({
setting = setting,
value = tostring(value)
}))
end
function Connection:current_database()
return self:query_value("SELECT current_database()")
end
function Connection:current_schema()
return self:query_value("SELECT current_schema()")
end
function Connection:version()
return self:query_value("SELECT version()")
end
function Connection:list_schemas()
return self:query("SELECT schema_name FROM information_schema.schemata ORDER BY schema_name")
end
function Connection:list_tables(schema_name)
schema_name = schema_name or "public"
return self:query("SELECT tablename FROM pg_tables WHERE schemaname = $1 ORDER BY tablename", schema_name:trim())
end
function Connection:describe_table(table_name, schema_name)
if table_name:is_blank() then
error("Table name cannot be empty")
end
schema_name = schema_name or "public"
return self:query([[
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
]], schema_name:trim(), table_name:trim())
end
-- JSON/JSONB helpers
function Connection:json_extract(column, path)
if column:is_blank() or path:is_blank() then
error("Column and path cannot be empty")
end
return "{{column}}->'{{path}}'":parse({column = column, path = path})
end
function Connection:json_extract_text(column, path)
if column:is_blank() or path:is_blank() then
error("Column and path cannot be empty")
end
return "{{column}}->>'{{path}}'":parse({column = column, path = path})
end
function Connection:jsonb_contains(column, value)
if column:is_blank() or value:is_blank() then
error("Column and value cannot be empty")
end
return "{{column}} @> '{{value}}'":parse({column = column, value = value})
end
function Connection:jsonb_contained_by(column, value)
if column:is_blank() or value:is_blank() then
error("Column and value cannot be empty")
end
return "{{column}} <@ '{{value}}'":parse({column = column, value = value})
end
-- Array helpers
function Connection:array_contains(column, value)
if column:is_blank() then
error("Column cannot be empty")
end
return "$1 = ANY({{column}})":parse({column = column})
end
function Connection:array_length(column)
if column:is_blank() then
error("Column cannot be empty")
end
return "array_length({{column}}, 1)":parse({column = column})
end
-- Connection management
function postgres.parse_dsn(dsn)
if dsn:is_blank() then
return nil, "DSN cannot be empty"
end
local parts = {}
for pair in dsn:trim():gmatch("[^%s]+") do
local key, value = pair:match("([^=]+)=(.+)")
if key and value then
parts[key:trim()] = value:trim()
end
end
return parts
end
function postgres.connect(dsn)
if dsn:is_blank() then
error("DSN cannot be empty")
end
local conn_id = moonshark.sql_connect("postgres", dsn:trim())
if conn_id then
return setmetatable({_id = conn_id}, Connection)
end
return nil
end
postgres.open = postgres.connect
-- Quick execution functions
function postgres.query(dsn, query_str, ...)
local conn = postgres.connect(dsn)
if not conn then
error("Failed to connect to PostgreSQL database")
end
local results = conn:query(query_str, ...)
conn:close()
return results
end
function postgres.exec(dsn, query_str, ...)
local conn = postgres.connect(dsn)
if not conn then
error("Failed to connect to PostgreSQL database")
end
local result = conn:exec(query_str, ...)
conn:close()
return result
end
function postgres.query_row(dsn, query_str, ...)
local results = postgres.query(dsn, query_str, ...)
return results and #results > 0 and results[1] or nil
end
function postgres.query_value(dsn, query_str, ...)
local row = postgres.query_row(dsn, query_str, ...)
if row then
for _, value in pairs(row) do
return value
end
end
return nil
end
-- Migration helpers
function postgres.migrate(dsn, migrations, schema)
schema = schema or "public"
local conn = postgres.connect(dsn)
if not conn then
error("Failed to connect to PostgreSQL database for migration")
end
conn:create_table("_migrations", "id SERIAL PRIMARY KEY, name TEXT UNIQUE NOT NULL, applied_at TIMESTAMPTZ DEFAULT NOW()")
local tx = conn:begin()
if not tx then
conn:close()
error("Failed to begin migration transaction")
end
for _, migration in ipairs(migrations) do
if not migration.name or migration.name:is_blank() then
tx:rollback()
conn:close()
error("Migration must have a non-empty name")
end
local existing = conn:query_value("SELECT id FROM _migrations WHERE name = $1", migration.name:trim())
if not existing then
local ok, err = pcall(function()
if type(migration.up) == "string" then
conn:exec(migration.up)
elseif type(migration.up) == "function" then
migration.up(conn)
else
error("Migration 'up' must be string or function")
end
end)
if ok then
conn:exec("INSERT INTO _migrations (name) VALUES ($1)", migration.name:trim())
print("Applied migration: {{name}}":parse({name = migration.name}))
else
tx:rollback()
conn:close()
error("Migration '{{name}}' failed: {{error}}":parse({
name = migration.name,
error = err or "unknown error"
}))
end
end
end
tx:commit()
conn:close()
return true
end
-- Result processing utilities
function postgres.to_array(results, column_name)
if not results or table.is_empty(results) then return {} end
if column_name:is_blank() then error("Column name cannot be empty") end
return table.map(results, function(row) return row[column_name] end)
end
function postgres.to_map(results, key_column, value_column)
if not results or table.is_empty(results) then return {} end
if key_column:is_blank() then error("Key column name cannot be empty") end
local map = {}
for _, row in ipairs(results) do
local key = row[key_column]
map[key] = value_column and row[value_column] or row
end
return map
end
function postgres.group_by(results, column_name)
if not results or table.is_empty(results) then return {} end
if column_name:is_blank() then error("Column name cannot be empty") end
return table.group_by(results, function(row) return row[column_name] end)
end
function postgres.print_results(results)
if not results or table.is_empty(results) then
print("No results")
return
end
local columns = table.keys(results[1])
table.sort(columns)
-- Calculate column widths
local widths = {}
for _, col in ipairs(columns) do
widths[col] = col:length()
for _, row in ipairs(results) do
local value = tostring(row[col] or "")
widths[col] = math.max(widths[col], value:length())
end
end
-- Print header and separator
local header_parts = table.map(columns, function(col) return col:pad_right(widths[col]) end)
local separator_parts = table.map(columns, function(col) return string.repeat_("-", widths[col]) end)
print(table.concat(header_parts, " | "))
print(table.concat(separator_parts, "-+-"))
-- Print rows
for _, row in ipairs(results) do
local value_parts = table.map(columns, function(col)
local value = tostring(row[col] or "")
return value:pad_right(widths[col])
end)
print(table.concat(value_parts, " | "))
end
end
function postgres.escape_identifier(name)
if name:is_blank() then
error("Identifier name cannot be empty")
end
return '"{{name}}"':parse({name = name:replace('"', '""')})
end
function postgres.escape_literal(value)
if type(value) == "string" then
return "'{{value}}'":parse({value = value:replace("'", "''")})
end
return tostring(value)
end
return postgres