diff options
Diffstat (limited to 'postgresql.scm')
-rw-r--r-- | postgresql.scm | 1360 |
1 files changed, 1360 insertions, 0 deletions
diff --git a/postgresql.scm b/postgresql.scm new file mode 100644 index 0000000..3f1baf4 --- /dev/null +++ b/postgresql.scm @@ -0,0 +1,1360 @@ +;;; Bindings to the PostgreSQL C library +;; +;; Copyright (C) 2008-2014 Peter Bex +;; Copyright (C) 2004 Johannes Grødem <johs@copyleft.no> +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +(module postgresql + (type-parsers update-type-parsers! default-type-parsers + char-parser bool-parser bytea-parser numeric-parser + make-array-parser make-composite-parser + scheme-value->db-value type-unparsers update-type-unparsers! + default-type-unparsers bool-unparser vector-unparser list-unparser + + connect reset-connection disconnect connection? connected? + + query query* with-transaction in-transaction? + + result? clear-result! row-count column-count + column-index column-name column-names column-format + column-type column-type-modifier table-oid table-column-index + value-at row-values row-alist column-values affected-rows inserted-oid + + invalid-oid + + escape-string escape-bytea unescape-bytea quote-identifier + + put-copy-data put-copy-end get-copy-data + + row-fold row-fold* row-fold-right row-fold-right* + row-for-each row-for-each* row-map row-map* + column-fold column-fold* column-fold-right column-fold-right* + column-for-each column-for-each* column-map column-map* + copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right + copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map + call-with-output-copy-query call-with-output-copy-query* + with-output-to-copy-query with-output-to-copy-query*) + + (import scheme + (chicken base) + (chicken foreign) + (chicken string) + (chicken port) + (chicken memory) + (chicken condition) + (chicken format) + (chicken gc) + (chicken blob) + srfi-1 + srfi-4 + srfi-13 + srfi-18 + srfi-69 + sql-null) + +(import-for-syntax (chicken string)) + +(foreign-declare "#include <libpq-fe.h>") + +(define-foreign-type pg-polling-status (enum "PostgresPollingStatusType")) +(define-foreign-variable PGRES_POLLING_FAILED pg-polling-status) +(define-foreign-variable PGRES_POLLING_READING pg-polling-status) +(define-foreign-variable PGRES_POLLING_WRITING pg-polling-status) +(define-foreign-variable PGRES_POLLING_OK pg-polling-status) + +(define-foreign-type pg-exec-status (enum "ExecStatusType")) +(define-foreign-variable PGRES_EMPTY_QUERY pg-exec-status) +(define-foreign-variable PGRES_COMMAND_OK pg-exec-status) +(define-foreign-variable PGRES_TUPLES_OK pg-exec-status) +(define-foreign-variable PGRES_COPY_OUT pg-exec-status) +(define-foreign-variable PGRES_COPY_IN pg-exec-status) +(define-foreign-variable PGRES_BAD_RESPONSE pg-exec-status) +(define-foreign-variable PGRES_NONFATAL_ERROR pg-exec-status) +(define-foreign-variable PGRES_FATAL_ERROR pg-exec-status) + +(define-foreign-type pgconn* (c-pointer "PGconn")) + +(define PQconnectStart (foreign-lambda pgconn* PQconnectStart (const c-string))) +(define PQconnectPoll (foreign-lambda pg-polling-status PQconnectPoll pgconn*)) +(define PQresetStart (foreign-lambda bool PQresetStart pgconn*)) +(define PQresetPoll (foreign-lambda pg-polling-status PQresetPoll pgconn*)) +(define PQfinish (foreign-lambda void PQfinish pgconn*)) +(define PQstatus (foreign-lambda (enum "ConnStatusType") PQstatus (const pgconn*))) +(define PQerrorMessage (foreign-lambda c-string PQerrorMessage (const pgconn*))) + +;(define-foreign-type oid "Oid") +(define-foreign-type oid unsigned-int) + +(define invalid-oid (foreign-value "InvalidOid" oid)) + +(define PQisBusy (foreign-lambda bool PQisBusy pgconn*)) +(define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*)) + +(define-foreign-type pgresult* (c-pointer "PGresult")) + +(define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*)) +(define PQresultStatus (foreign-lambda pg-exec-status PQresultStatus (const pgresult*))) +(define PQresultErrorMessage (foreign-lambda c-string PQresultErrorMessage (const pgresult*))) +(define PQresultErrorField (foreign-lambda c-string PQresultErrorField (const pgresult*) int)) + +(define PQclear (foreign-lambda void PQclear pgresult*)) +(define PQnfields (foreign-lambda int PQnfields (const pgresult*))) +(define PQntuples (foreign-lambda int PQntuples (const pgresult*))) +(define PQfname (foreign-lambda c-string PQfname (const pgresult*) int)) +(define PQfnumber (foreign-lambda int PQfnumber (const pgresult*) (const c-string))) +(define PQftable (foreign-lambda oid PQftable (const pgresult*) int)) +(define PQftablecol (foreign-lambda int PQftablecol (const pgresult*) int)) +(define PQfformat (foreign-lambda int PQfformat (const pgresult*) int)) +(define PQftype (foreign-lambda oid PQftype (const pgresult*) int)) +(define PQfmod (foreign-lambda int PQfmod (const pgresult*) int)) +(define PQgetisnull (foreign-lambda bool PQgetisnull (const pgresult*) int int)) +(define PQgetlength (foreign-lambda int PQgetlength (const pgresult*) int int)) +(define PQgetvalue-ptr (foreign-lambda (c-pointer char) PQgetvalue (const pgresult*) int int)) +(define PQcmdTuples (foreign-lambda nonnull-c-string PQcmdTuples pgresult*)) +(define PQoidValue (foreign-lambda oid PQoidValue pgresult*)) + +(define PQputCopyData (foreign-lambda int PQputCopyData pgconn* scheme-pointer int)) +(define PQputCopyEnd (foreign-lambda int PQputCopyEnd pgconn* (const c-string))) +(define PQgetCopyData (foreign-lambda int PQgetCopyData pgconn* (c-pointer (c-pointer char)) bool)) + +(define memcpy (foreign-lambda c-pointer "C_memcpy" scheme-pointer c-pointer size_t)) + +(define (srfi-4-vector? x) ; Copied from ##sys#srfi-4-vector? from 4.8.3 + (and (##core#inline "C_blockp" x) + (##sys#generic-structure? x) + (memq (##sys#slot x 0) + '(u8vector u16vector s8vector s16vector u32vector s32vector f32vector f64vector)))) + +;; TODO: Create a real callback system? +(foreign-declare "static void nullNoticeReceiver(void *arg, const PGresult *res){ }") + +(define-syntax define-foreign-int + (er-macro-transformer + (lambda (e r c) + ;; cannot rename define-foreign-variable; it's a really special form + `(define-foreign-variable ,(cadr e) int ,(conc "(int) " (cadr e)))))) + +(define-syntax define-pgdiag-constants + (syntax-rules () + ((_ (?condition ?constant0 ...) ?more ...) + (begin (cond-expand + (?condition (define-foreign-int ?constant0) ...) + (else (define ?constant0 #f) ...)) + (define-pgdiag-constants ?more ...))) + ((_ ?constant0 ?more ...) + (begin (define-foreign-int ?constant0) + (define-pgdiag-constants ?more ...))) + ((_) (void)))) + +(define-pgdiag-constants + PG_DIAG_SEVERITY PG_DIAG_SQLSTATE PG_DIAG_CONTEXT PG_DIAG_MESSAGE_PRIMARY + PG_DIAG_MESSAGE_DETAIL PG_DIAG_MESSAGE_HINT PG_DIAG_STATEMENT_POSITION + PG_DIAG_SOURCE_FILE PG_DIAG_SOURCE_LINE PG_DIAG_SOURCE_FUNCTION + + (has-diag-query-position + PG_DIAG_INTERNAL_QUERY PG_DIAG_INTERNAL_POSITION) + + (has-diag-schema-info + PG_DIAG_SCHEMA_NAME PG_DIAG_TABLE_NAME PG_DIAG_COLUMN_NAME + PG_DIAG_DATATYPE_NAME PG_DIAG_CONSTRAINT_NAME)) + +;; Helper procedure for lists (TODO: use ANY instead of IN with an array?) +(define (in-list len) + (string-intersperse + (list-tabulate len (lambda (p) (conc "$" (add1 p)))) ",")) + +(define (postgresql-error subtype loc message . args) + (signal (apply make-pg-condition subtype loc message args))) + +(define (make-pg-condition subtype loc message . args) + (make-composite-condition + (make-property-condition + 'exn 'location loc 'message message 'arguments args) + (make-property-condition 'postgresql) + (if (condition? subtype) subtype (make-property-condition subtype)))) + +;;;;;;;;;;;;;;;;;;;;;;;; +;;;; Type parsers +;;;;;;;;;;;;;;;;;;;;;;;; + +(define (char-parser str) (string-ref str 0)) + +(define (bool-parser str) (string=? str "t")) + +(define (numeric-parser str) + (or (string->number str) + (postgresql-error 'parse 'numeric-parser "Unable to parse number" str))) + +(define (bytea-parser str) + (unescape-bytea str)) + +;; Here be dragons +(define (make-array-parser element-parser #!optional (delim #\,)) + (define (parse str) + (if (string-ci=? "NULL" str) + (sql-null) + (element-parser str))) + (lambda (str) + (let loop ((chars (let ignore-bounds ((chars (string->list str))) + (if (char=? (car chars) #\{) + chars + (ignore-bounds (cdr chars))))) + (result (list))) + (if (null? chars) + (car result) ; Should contain only one vector + (case (car chars) + ((#\{) (receive (value rest-chars) + (loop (cdr chars) (list)) + (loop rest-chars (cons value result)))) + ((#\}) (values (list->vector (reverse! result)) (cdr chars))) + ((#\") (let consume-string ((chars (cdr chars)) + (consumed (list))) + (case (car chars) + ((#\\) (consume-string ; Don't interpret, just add it + (cddr chars) (cons (cadr chars) consumed))) + ((#\") (loop (cdr chars) + (cons (element-parser + (reverse-list->string consumed)) + result))) + (else (consume-string (cdr chars) + (cons (car chars) consumed)))))) + ((#\tab #\newline #\space) (loop (cdr chars) result)) + (else + (if (char=? (car chars) delim) + (loop (cdr chars) result) + (let consume-string ((chars chars) + (consumed (list))) + (cond + ((char=? (car chars) delim) + (loop (cdr chars) + (cons (parse (reverse-list->string consumed)) + result))) + ((or (char=? (car chars) #\}) + (char=? (car chars) #\})) + (loop chars + (cons (parse (reverse-list->string consumed)) + result))) + (else (consume-string (cdr chars) + (cons (car chars) consumed)))))))))))) + +(define (make-composite-parser element-parsers) + (define (parse str element-parser) + (if (string=? "" str) + (sql-null) + (element-parser str))) + (lambda (str) + (let loop ((chars (cdr (string->list (string-trim str)))) ; skip leading ( + (maybe-null? #t) + (result (list)) + (parsers element-parsers)) + (case (car chars) + ((#\)) (reverse! (if maybe-null? + (cons (sql-null) result) + result))) + ((#\") (let consume-string ((chars (cdr chars)) + (consumed (list))) + (case (car chars) + ((#\\) (consume-string ; Don't interpret, just add it + (cddr chars) (cons (cadr chars) consumed))) + ((#\") (if (char=? #\" (cadr chars)) ; double escapes + (consume-string (cddr chars) + (cons #\" consumed)) + (let skip-spaces ((chars (cdr chars))) + (case (car chars) + ((#\space #\newline #\tab) + (skip-spaces (cdr chars))) + ((#\,) + (loop (cdr chars) + #t + (cons ((car parsers) + (reverse-list->string consumed)) + result) + (cdr parsers))) + ((#\)) (loop chars + #f + (cons ((car parsers) + (reverse-list->string consumed)) + result) + (cdr parsers))) + (else + (postgresql-error + 'parse 'make-composite-parser + "Bogus trailing characters" str)))))) + (else (consume-string (cdr chars) + (cons (car chars) consumed)))))) + (else (let consume-string ((chars chars) + (consumed (list))) + (case (car chars) + ((#\,) (loop (cdr chars) + #t + (cons (parse (reverse-list->string consumed) + (car parsers)) + result) + (cdr parsers))) + ;; Nothing should precede this one + ((#\)) (loop chars + #f + (cons (parse (reverse-list->string consumed) + (car parsers)) + result) + (cdr parsers))) + (else (consume-string (cdr chars) + (cons (car chars) consumed)))))))))) + +;; Array parsers and composite parsers are automatically cached when such +;; a value is requested. +(define default-type-parsers + (make-parameter + `(("text" . ,identity) + ("bytea" . ,bytea-parser) + ("char" . ,char-parser) + ("bpchar" . ,identity) + ("bool" . ,bool-parser) + ("int8" . ,numeric-parser) + ("int4" . ,numeric-parser) + ("int2" . ,numeric-parser) + ("float4" . ,numeric-parser) + ("float8" . ,numeric-parser) + ("numeric" . ,numeric-parser) + ("oid" . ,numeric-parser) + ;; Nasty hack, or clever hack? :) + ("record" . ,(make-composite-parser (circular-list identity)))))) + +;;;;;;;;;;;;;;;;;;;;;;; +;;;; Type unparsers +;;;;;;;;;;;;;;;;;;;;;;; + +(define (scheme-value->db-value conn value) + (cond ((find (lambda (parse?) + ((car parse?) value)) + (pg-connection-type-unparsers conn)) => + (lambda (parse) + ((cdr parse) conn value))) + (else value))) + +(define (bool-unparser conn b) + (if b "TRUE" "FALSE")) + +(define (vector-unparser conn v) + (let loop ((result (list)) + (pos 0) + (len (vector-length v))) + (if (= pos len) + (string-append "{" (string-intersperse (reverse! result) ",") "}") + (let* ((value (vector-ref v pos)) + (unparsed-value (scheme-value->db-value conn value)) + (serialized (cond + ((sql-null? unparsed-value) "NULL") + ((not (string? unparsed-value)) + (postgresql-error + 'unparse 'vector-unparser + "Param value is not string" unparsed-value)) + ((vector? value) unparsed-value) ;; don't quote! + (else + (sprintf "\"~A\"" + (string-translate* + unparsed-value + '(("\\" . "\\\\") ("\"" . "\\\"")))))))) + (loop (cons serialized result) (add1 pos) len))))) + +(define (list-unparser conn l) + (let loop ((result (list)) + (items l)) + (if (null? items) + (string-append "(" (string-intersperse (reverse! result) ",") ")") + (let* ((unparsed-value (scheme-value->db-value conn (car items))) + (serialized (cond + ((sql-null? unparsed-value) "") + ((not (string? unparsed-value)) + (postgresql-error + 'unparse 'list-unparser + "Param value is not string" unparsed-value)) + (else + (sprintf "\"~A\"" + (string-translate* + unparsed-value + '(("\\" . "\\\\") ("\"" . "\\\"")))))))) + (loop (cons serialized result) (cdr items)))))) + +(define default-type-unparsers + (make-parameter + `((,string? . ,(lambda (conn s) s)) + (,u8vector? . ,(lambda (conn v) (u8vector->blob/shared v))) + (,char? . ,(lambda (conn c) (string c))) + (,boolean? . ,bool-unparser) + (,number? . ,(lambda (conn n) (number->string n))) + (,vector? . ,vector-unparser) + (,pair? . ,list-unparser)))) + +;; Retrieve type-oids from PostgreSQL: +(define (update-type-parsers! conn #!optional new-type-parsers) + (let ((type-parsers (or new-type-parsers (pg-connection-type-parsers conn))) + (ht (make-hash-table)) + (result '())) + (pg-connection-oid-parsers-set! conn ht) + (pg-connection-type-parsers-set! conn type-parsers) + (unless (null? type-parsers) ; empty IN () clause is not allowed + (row-for-each* + (lambda (oid typname) + (and-let* ((procedure (assoc typname type-parsers))) + (hash-table-set! ht (string->number oid) (cdr procedure)))) + (query* conn + (sprintf + "SELECT oid, typname FROM pg_type WHERE typname IN (~A)" + (in-list (length type-parsers))) + (map car type-parsers) raw: #t))))) + +(define (update-type-unparsers! conn new-type-unparsers) + (pg-connection-type-unparsers-set! conn new-type-unparsers)) + +;;;;;;;;;;;;;;;;;;;; +;;;; Connections +;;;;;;;;;;;;;;;;;;;; + +(define-record pg-connection + ptr type-parsers oid-parsers type-unparsers transaction-depth) +(define connection? pg-connection?) +(define type-parsers pg-connection-type-parsers) +(define type-unparsers pg-connection-type-unparsers) + +(define (connected? conn) (not (not (pg-connection-ptr conn)))) + +(define (pgsql-connection->fd conn) + ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn))) + +(define (wait-for-connection! conn poll-function) + (let ((conn-fd (pgsql-connection->fd conn)) + (conn-ptr (pg-connection-ptr conn))) + (let loop ((result (poll-function conn-ptr))) + (cond ((= result PGRES_POLLING_OK) (void)) + ((= result PGRES_POLLING_FAILED) + (let ((message (PQerrorMessage conn-ptr))) + (disconnect conn) + (postgresql-error + 'connect 'connect + (conc "Polling Postgres database failed: " message) conn))) + ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING)) + (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result) + #:input + #:output)) + (loop (poll-function conn-ptr))) + (else + (postgresql-error + 'internal 'connect + (conc "Internal error! Unknown status code: " result) conn)))))) + +(cond-expand + ((not has-connectdb-params) + (define (alist->connection-spec alist) + (string-join + (map (lambda (subspec) + (sprintf "~A='~A'" + (car subspec) ;; this had better not contain [ =\'] + (string-translate* (->string (cdr subspec)) + '(("\\" . "\\\\") ("'" . "\\'"))))) + alist)))) + (else)) + +(define (connect-start spec) + (if (string? spec) + (PQconnectStart spec) + (cond-expand + (has-connectdb-params + (let ((len (length spec))) + ((foreign-lambda* pgconn* ((scheme-object cons) (scheme-pointer keybuf) + (scheme-pointer valbuf) (int len)) + "const char **key = (const char **)keybuf;" + "const char **val = (const char **)valbuf;" + "int i;" + "for (i=0; i < len; ++i,cons=C_u_i_cdr(cons)) {" + " C_word kvpair = C_u_i_car(cons);" + " key[i] = C_c_string(C_u_i_car(kvpair));" + " val[i] = C_c_string(C_u_i_cdr(kvpair));" + "}" + "key[len] = NULL;" + "val[len] = NULL;" + "C_return(PQconnectStartParams(key, val, 0));") + (map (lambda (x) (cons (string-append (->string (car x)) "\x00") + (string-append (->string (cdr x)) "\x00"))) spec) + (make-blob (* (add1 len) (foreign-value "sizeof(char *)" int))) + (make-blob (* (add1 len) (foreign-value "sizeof(char *)" int))) + len))) + (else (PQconnectStart (alist->connection-spec spec)))))) + +(define (connect #!optional (conn-spec '()) + (type-parsers (default-type-parsers)) + (type-unparsers (default-type-unparsers))) + (let ((conn-ptr (connect-start conn-spec))) + (cond + ((not conn-ptr) + (postgresql-error + 'internal 'connect + "Unable to allocate a Postgres connection structure" conn-spec)) + ((= (foreign-value "CONNECTION_BAD" int) (PQstatus conn-ptr)) + (let ((message (PQerrorMessage conn-ptr))) + (PQfinish conn-ptr) + (postgresql-error + 'connect 'connect + (conc "Connection to Postgres database failed: " message) conn-spec))) + (else + (let ((conn (make-pg-connection conn-ptr type-parsers + (make-hash-table) type-unparsers 0))) + ;; We don't want libpq to piss in our stderr stream + ((foreign-lambda* void ((pgconn* conn)) + "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr) + (wait-for-connection! conn PQconnectPoll) + (set-finalizer! conn disconnect) + ;; Retrieve type-information from PostgreSQL metadata for use by + ;; the various value-parsers. + (update-type-parsers! conn) + conn))))) + +(define (reset-connection connection) + (let ((conn-ptr (pg-connection-ptr connection))) + (if (PQresetStart conn-ptr) ;; Update oid-parsers? + (wait-for-connection! connection PQresetPoll) + (let ((error-message (PQerrorMessage conn-ptr))) + (disconnect connection) + (postgresql-error + 'connect 'reset-connection + (conc "Reset of connection failed: " error-message) connection))))) + +(define (disconnect connection) + (and-let* ((conn-ptr (pg-connection-ptr connection))) + (pg-connection-ptr-set! connection #f) + (pg-connection-type-parsers-set! connection #f) + (pg-connection-oid-parsers-set! connection #f) + (PQfinish conn-ptr)) + (void)) + +;;;;;;;;;;;;;;; +;;;; Results +;;;;;;;;;;;;;;; + +(define-record pg-result ptr value-parsers) +(define result? pg-result?) + +(define (clear-result! result) + (and-let* ((result-ptr (pg-result-ptr result))) + (pg-result-ptr-set! result #f) + (PQclear result-ptr))) + +(define (row-count result) + (PQntuples (pg-result-ptr result))) + +(define (column-count result) + (PQnfields (pg-result-ptr result))) + +;; Helper procedures for bounds checking; so we can distinguish between +;; out of bounds and nonexistant columns, and signal it. +(define (check-column-index! result index location) + (when (>= index (column-count result)) + (postgresql-error + 'bounds location + (sprintf "Result column ~A out of bounds" index) result index))) + +(define (check-row-index! result index location) + (when (>= index (row-count result)) + (postgresql-error + 'bounds location + (sprintf "Result row ~A out of bounds" index) result index))) + +(define (column-name result index) + (check-column-index! result index 'column-name) + (string->symbol (PQfname (pg-result-ptr result) index))) + +(define (column-names result) + (let ((ptr (pg-result-ptr result))) + (let loop ((names '()) + (column (column-count result))) + (if (= column 0) + names + (loop (cons (string->symbol (PQfname ptr (sub1 column))) names) + (sub1 column)))))) + +(define (column-index result name) + (let ((idx (PQfnumber (pg-result-ptr result) (symbol->string name)))) + (and (>= idx 0) idx))) + +(define (table-oid result index) + (check-column-index! result index 'table-oid) + (let ((oid (PQftable (pg-result-ptr result) index))) + (and (not (= oid invalid-oid)) oid))) + +;; Fixes the off-by-1 unexpectedness in libpq/the protocol to make it more +;; consistent with the rest of Scheme. However, this is inconsistent with +;; almost all other PostgreSQL interfaces... +(define (table-column-index result index) + (check-column-index! result index 'table-column-index) + (let ((idx (PQftablecol (pg-result-ptr result) index))) + (and (> idx 0) (sub1 idx)))) + +(define format-table + '((0 . text) (1 . binary))) + +(define (format->symbol format) + (or (alist-ref format format-table eq?) + (postgresql-error 'type 'format->symbol "Unknown format" format))) + +(define (symbol->format symbol) + (or (and-let* ((res (rassoc symbol format-table eq?))) + (car res)) + (postgresql-error 'type 'format->symbol "Unknown format" symbol))) + +(define (column-format result index) + (check-column-index! result index 'column-format) + (format->symbol (PQfformat (pg-result-ptr result) index))) + +(define (column-type result index) + (check-column-index! result index 'column-type) + (PQftype (pg-result-ptr result) index)) + +;; This is really not super-useful as it requires intimate knowledge +;; about the internal implementations of types in PostgreSQL. +(define (column-type-modifier result index) + (check-column-index! result index 'column-type) + (let ((mod (PQfmod (pg-result-ptr result) index))) + (and (>= mod 0) mod))) + +;; Unchecked version, for speed +(define (value-at* result column row raw) + (let ((ptr (pg-result-ptr result))) + (if (PQgetisnull ptr row column) + (sql-null) + (let* ((len (PQgetlength ptr row column)) + (fmt (PQfformat ptr column)) + (value (case fmt + ((0) (make-string len)) + ((1) (make-blob len)) + (else (postgresql-error + 'internal 'value-at + (conc "Unknown column format type: " fmt) + result column row raw))))) + (memcpy value (PQgetvalue-ptr ptr row column) len) + (if (or raw (blob? value)) + value + ((vector-ref (pg-result-value-parsers result) column) value)))))) + +(define (value-at result #!optional (column 0) (row 0) #!key raw) + (check-row-index! result row 'value) + (check-column-index! result column 'value) + (value-at* result column row raw)) + +(define (row-values* result row column-count raw) + (let loop ((list '()) + (column column-count)) + (if (= column 0) + list + (loop (cons (value-at* result (sub1 column) row raw) list) + (sub1 column))))) + +(define (row-values result #!optional (row 0) #!key raw) + (check-row-index! result row 'row) + (row-values* result row (column-count result) raw)) + +(define (column-values* result column row-count raw) + (let loop ((list '()) + (row row-count)) + (if (= row 0) + list + (loop (cons (value-at* result column (sub1 row) raw) list) + (sub1 row))))) + +(define (column-values result #!optional (column 0) #!key raw) + (check-column-index! result column 'column) + (column-values* result column (row-count result) raw)) + +;; (define (row-alist result #!optional (row 0) #!key raw) +;; (map cons (column-names result) (row-values result row raw: raw))) +(define (row-alist result #!optional (row 0) #!key raw) + (check-row-index! result row 'row-alist) + (let loop ((alist '()) + (column (column-count result))) + (if (= column 0) + alist + (loop (cons (cons (string->symbol + (PQfname (pg-result-ptr result) (sub1 column))) + (value-at* result (sub1 column) row raw)) alist) + (sub1 column))))) + +;;; TODO: Do we want/need PQnparams and PQparamtype bindings? + +(define (affected-rows result) + (string->number (PQcmdTuples (pg-result-ptr result)))) + +(define (inserted-oid result) + (let ((oid (PQoidValue (pg-result-ptr result)))) + (and (not (= oid invalid-oid)) oid))) + + +;;;;;;;;;;;;;;;;;;;;;;;; +;;;; Query procedures +;;;;;;;;;;;;;;;;;;;;;;;; + +;; Buffer all available input, yielding if nothing is available: +(define (buffer-available-input! conn) + (let ((conn-ptr (pg-connection-ptr conn)) + (conn-fd (pgsql-connection->fd conn))) + (let loop () + (if (PQconsumeInput conn-ptr) + (when (PQisBusy conn-ptr) + (thread-wait-for-i/o! conn-fd #:input) + (loop)) + (postgresql-error + 'i/o 'buffer-available-input! + (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) + conn))))) + +;; Here be more dragons +(define (resolve-unknown-types! conn oids) + (unless (null? oids) + (let* ((parsers (pg-connection-oid-parsers conn)) + (q (conc "SELECT t.oid, t.typtype, t.typelem, t.typdelim, " + " t.typbasetype, t.typarray, a.attrelid, a.atttypid " + "FROM pg_type t " + " LEFT JOIN pg_attribute a " + " ON t.typrelid = a.attrelid AND a.attnum > 0 " + "WHERE t.oid IN (~A) " + "ORDER BY COALESCE(t.typrelid,-1) ASC, a.attnum ASC")) + (result (query* conn (sprintf q (in-list (length oids))) + (map number->string oids) raw: #t)) + (count (row-count result))) + (let dissect-types ((unknown-oids (list)) + (pos 0) + (domains (list)) + (arrays (list)) + (classes (list)) + (last-class 0)) + (cond + ((>= pos count) ; Done scanning rows? + ;; Keep going until all oids are resolved + (resolve-unknown-types! conn unknown-oids) + ;; Postprocessing step: resolve all nested types + (for-each (lambda (d) + (and-let* ((p (hash-table-ref/default parsers (cdr d) #f))) + (hash-table-set! parsers (car d) p))) + domains) + (for-each (lambda (a) + (and-let* ((p (hash-table-ref/default parsers (cddr a) #f))) + (hash-table-set! parsers (car a) + (make-array-parser p (cadr a))))) + arrays) + (for-each + (lambda (c) + (and-let* ((p-list + (fold + (lambda (att l) + (and-let* ((l) + (p (hash-table-ref/default parsers att #f))) + (cons p l))) + '() + (cdr c)))) + (hash-table-set! parsers (car c) + (make-composite-parser p-list)))) + classes)) + ((not (string=? (value-at* result 4 pos #f) "0")) ; Domain type? + (let* ((basetype-oid (string->number (value-at* result 4 pos #f))) + (parser (hash-table-ref/default parsers basetype-oid #f)) + (oid (string->number (value-at* result 0 pos #f)))) + (dissect-types (if parser + unknown-oids + (cons basetype-oid unknown-oids)) + (add1 pos) (cons (cons oid basetype-oid) domains) + arrays classes last-class))) + ((string=? (value-at* result 5 pos #f) "0") ; Array value? + (let* ((elem (string->number (value-at* result 2 pos #f))) + (delim (string-ref (value-at* result 3 pos #f) 0)) + (parser (hash-table-ref/default parsers elem #f)) + (oid (string->number (value-at* result 0 pos #f)))) + (dissect-types (if parser + unknown-oids + (cons elem unknown-oids)) + (add1 pos) domains + (cons (cons oid (cons delim elem)) arrays) + classes last-class))) + ((string=? (value-at* result 1 pos #f) "c") ; Class? (i.e., table or type) + (let* ((classid (string->number (value-at* result 6 pos #f))) + (attrid (string->number (value-at* result 7 pos #f))) + (parser (hash-table-ref/default parsers attrid #f))) + (dissect-types (if parser + unknown-oids + (cons attrid unknown-oids)) + (add1 pos) domains arrays + ;; Keep oid at the front of the list, insert this + ;; attr after it, before the other attrs, if any. + (if (= last-class classid) + (cons (cons (caar classes) + (cons attrid (cdar classes))) + (cdr classes)) + (cons (cons (string->number + (value-at* result 0 pos #f)) + (list attrid)) classes)) + classid))) + (else + (dissect-types unknown-oids (add1 pos) + domains arrays classes last-class))))))) + +(define (make-value-parsers conn pqresult #!key raw) + (let* ((nfields (PQnfields pqresult)) + (parsers (make-vector nfields)) + (ht (pg-connection-oid-parsers conn))) + (let loop ((col 0) + (unknowns (list))) + (if (= col nfields) + (begin + (resolve-unknown-types! conn (map cdr unknowns)) + (for-each (lambda (unknown) + (let* ((col (car unknown)) + (oid (cdr unknown)) + (parser (hash-table-ref/default ht oid identity))) + (vector-set! parsers col parser))) + unknowns) + parsers) + (let* ((oid (PQftype pqresult col)) + (parser (if raw identity (hash-table-ref/default ht oid #f)))) + (vector-set! parsers col parser) + (loop (add1 col) (if parser + unknowns + (cons (cons col oid) unknowns)))))))) + +;; Collect the result pointers from the last query. +;; +;; A pgresult represents an entire resultset and is always read into memory +;; all at once. +(define (get-last-result conn #!key raw) + (buffer-available-input! conn) + (let* ((conn-ptr (pg-connection-ptr conn)) + ;; Read out all remaining results (including the current one). + ;; TODO: Is this really needed? libpq does it (in pqExecFinish), + ;; but ostensibly only to concatenate the error messages for + ;; each query. OTOH, maybe we want to do that, too. + (clean-results! (lambda (result) + (let loop ((result result)) + (when result + (PQclear result) + (loop (PQgetResult conn-ptr)))))) + (result (PQgetResult conn-ptr)) + (status (PQresultStatus result))) + (cond + ((not result) (postgresql-error + 'internal 'get-last-result + "Internal error! No result object available from server" + conn)) + ((member status (list PGRES_BAD_RESPONSE PGRES_FATAL_ERROR + PGRES_NONFATAL_ERROR)) + (let* ((error-field (lambda (f) (and f (PQresultErrorField result f)))) + (error-field/int (lambda (f) + (and-let* ((value (error-field f))) + (string->number value)))) + (sqlstate (error-field PG_DIAG_SQLSTATE)) + (maybe-severity (error-field PG_DIAG_SEVERITY)) + (condition + (make-pg-condition + (make-property-condition + 'query + 'severity (and maybe-severity + (string->symbol + (string-downcase maybe-severity))) + 'error-class (and sqlstate (string-take sqlstate 2)) + 'error-code sqlstate + 'message-primary (error-field PG_DIAG_MESSAGE_PRIMARY) + 'message-detail (error-field PG_DIAG_MESSAGE_DETAIL) + 'message-hint (error-field PG_DIAG_MESSAGE_HINT) + 'statement-position (error-field/int PG_DIAG_STATEMENT_POSITION) + 'context (error-field PG_DIAG_CONTEXT) + 'source-file (error-field PG_DIAG_SOURCE_FILE) + 'source-line (error-field/int PG_DIAG_SOURCE_LINE) + 'source-function (error-field PG_DIAG_SOURCE_FUNCTION) + 'internal-query (error-field PG_DIAG_INTERNAL_QUERY) + 'internal-position (error-field/int PG_DIAG_INTERNAL_POSITION) + 'schema-name (error-field PG_DIAG_SCHEMA_NAME) + 'table-name (error-field PG_DIAG_TABLE_NAME) + 'column-name (error-field PG_DIAG_COLUMN_NAME) + 'datatype-name (error-field PG_DIAG_DATATYPE_NAME) + 'constraint-name (error-field PG_DIAG_CONSTRAINT_NAME)) + 'get-last-result + (PQresultErrorMessage result)))) + (clean-results! result) + (signal condition))) + ((member status (list PGRES_COPY_OUT PGRES_COPY_IN)) + ;; These are weird; A COPY puts the connection in "copy mode". + ;; As long as it's in "copy mode", pqgetresult will return the + ;; same result every time you call it, so don't try to call + ;; clean-results! + (let ((result-obj (make-pg-result result #f))) + (set-finalizer! result-obj clear-result!) + result-obj)) + ((member status (list PGRES_EMPTY_QUERY PGRES_COMMAND_OK PGRES_TUPLES_OK)) + (let ((result-obj (make-pg-result result #f))) + (set-finalizer! result-obj clear-result!) + (let ((trailing-result (PQgetResult conn-ptr))) + (when trailing-result + (clean-results! trailing-result) + (postgresql-error 'internal 'get-last-result + (conc "Internal error! Unexpected extra " + "results after first query result") + conn))) + (pg-result-value-parsers-set! + result-obj (make-value-parsers conn result raw: raw)) + result-obj)) + (else + (postgresql-error 'internal 'get-last-result + (conc "Internal error! Unknown status code: " status) + conn))))) + +(define (query conn query . params) + (query* conn query params)) + +(define (query* conn query #!optional (params '()) #!key (format 'text) raw) + (let* ((params ;; Check all params and ensure they are proper pairs + (map (lambda (p) + (let ((obj (if raw p (scheme-value->db-value conn p)))) + (cond ((string? obj) ; Convert to ASCIIZ + (let* ((len (##sys#size obj)) + (res (string-append obj "\x00"))) + (vector 0 len res))) + ((blob? obj) + (vector 1 (##sys#size obj) obj)) + ((sql-null? obj) #f) + (else (postgresql-error + 'type 'query* + (sprintf "Param value is not string, sql-null or blob: ~S" p) + conn query params format))))) + params)) + ;; It's a shame we need to traverse params twice (and again in C)... + (len (length params)) + (send-query + (foreign-lambda* + bool ((pgconn* conn) (nonnull-c-string query) + (bool is_prepped) (int num) (scheme-object params) + (scheme-pointer valsbuf) (scheme-pointer lensbuf) + (scheme-pointer fmtsbuf) (int rfmt)) + "int i = 0, *lens = (int *)lensbuf, *fmts = (int *)fmtsbuf;" + "const char **vals = (const char **)valsbuf;" + "C_word obj, cons;" + "for (i=0,cons=params; i < num; ++i,cons=C_u_i_cdr(cons)) {" + " obj = C_u_i_car(cons);" + " if (obj == C_SCHEME_FALSE) {" + " fmts[i] = lens[i] = 0;" + " vals[i] = NULL;" + " } else {" + " fmts[i] = C_unfix(C_block_item(obj, 0));" + " lens[i] = C_unfix(C_block_item(obj, 1));" + " vals[i] = C_c_string(C_block_item(obj, 2));" + " }" + "}" + "if (is_prepped)" + " C_return(PQsendQueryPrepared((PGconn *)conn, query, num," + " vals, lens, fmts, rfmt));" + "else" + " C_return(PQsendQueryParams((PGconn *)conn, query, num, NULL," + " vals, lens, fmts, rfmt));")) + (query-as-string (if (symbol? query) (symbol->string query) query))) + ;; XXX: What if we're using a newer protocol version? Then this error is + ;; well-meaning but completely wrong... Unfortunately the error message + ;; returned by the server if we exceed this limit is even more confusing. + (cond ((> len 65535) + (postgresql-error + 'domain 'query* + (sprintf "Too many bind parameters (PG protocol supports up to 65535, but got ~A). Try using the COPY support, or escaping the data and sending it as a big string." len) + conn query params format)) + ((send-query (pg-connection-ptr conn) query-as-string (symbol? query) + len params + ;; We allocate here instead of in C to keep things simple + (make-blob (* len (foreign-value "sizeof(char *)" int))) + (make-blob (* len (foreign-value "sizeof(int)" int))) + (make-blob (* len (foreign-value "sizeof(int)" int))) + (symbol->format format)) + (get-last-result conn raw: raw)) + (else (postgresql-error 'i/o 'query* + (conc "Unable to send query to server: " + (PQerrorMessage (pg-connection-ptr conn))) + conn query params format))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;; Transaction management +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (with-transaction conn thunk #!key isolation access) + (let* ((old-depth (pg-connection-transaction-depth conn)) + (isolation (and isolation + (case isolation + ((read-committed) "ISOLATION LEVEL READ COMMITTED") + ((serializable) "ISOLATION LEVEL SERIALIZABLE") + (else (postgresql-error + 'type 'with-transaction + "Unknown isolation level" isolation))))) + (access (and access + (case access + ((read/write) "READ WRITE") + ((read-only) "READ ONLY") + (else (postgresql-error + 'type 'with-transaction + "Unknown access mode" access))))) + (characteristics (conc (or isolation "") " " (or access ""))) + (rollback! + (lambda () + (if (zero? old-depth) + (query conn "ROLLBACK") + ;; We do not *need* to give savepoints unique names, + ;; but it aids debugging and we know the depth anyway. + (query conn (conc "ROLLBACK TO SAVEPOINT s_" old-depth))))) + (commit! + (lambda () + (if (zero? old-depth) + (query conn "COMMIT") + (query conn (conc "RELEASE SAVEPOINT s_" old-depth)))))) + (when (and isolation (not (zero? old-depth))) + (postgresql-error + 'domain 'with-transaction + "Can't set isolation level in nested transactions" isolation)) + (if (zero? old-depth) + (query conn (conc "BEGIN " characteristics)) + (begin (query conn (conc "SAVEPOINT s_" old-depth)) + ;; XXX: This should probably be SET LOCAL instead of SET + ;; (which is implicitly the same as SET SESSION), but I + ;; can't come up with a testcase that fails with this and + ;; succeeds with SET LOCAL, so keep it around for now. + (when access + (query conn (conc "SET TRANSACTION " characteristics))))) + (pg-connection-transaction-depth-set! conn (add1 old-depth)) + ;; TODO: Add a warning mechanism (using dynamic-wind) for when the + ;; user tries to jump into/out of transactions with continuations? + (handle-exceptions exn + (begin + (pg-connection-transaction-depth-set! conn old-depth) + (rollback!) + (raise exn)) + (let ((res (thunk))) + (pg-connection-transaction-depth-set! conn old-depth) + (if res (commit!) (rollback!)) + res)))) + +(define (in-transaction? conn) + (> (pg-connection-transaction-depth conn) 0)) + +;;;;;;;;;;;;;;;;;;;; +;;;; COPY support +;;;;;;;;;;;;;;;;;;;; + +(define (put-copy-data conn data) + (let* ((data (cond + ((or (string? data) (blob? data)) data) + ((srfi-4-vector? data) (##sys#slot data 1)) + (else (postgresql-error + 'type 'put-copy-data + "Expected a blob, string or srfi-4 vector" conn data)))) + (len (##sys#size data)) + (conn-ptr (pg-connection-ptr conn)) + (conn-fd (pgsql-connection->fd conn))) + (let loop ((res (PQputCopyData conn-ptr data len))) + (cond + ((= res 0) + (thread-wait-for-i/o! conn-fd #:output) + (loop (PQputCopyData conn-ptr data len))) + ((= res 1) (void)) + ((= res -1) + (postgresql-error + 'i/o 'put-copy-data + (conc "Error putting COPY data: " (PQerrorMessage conn-ptr)) conn)) + (else (postgresql-error + 'internal 'put-copy-data + (conc "Internal error! Unexpected return value: " res) conn)))))) + +(define (put-copy-end conn #!optional error-message) + (let ((conn-ptr (pg-connection-ptr conn)) + (conn-fd (pgsql-connection->fd conn))) + (let loop ((res (PQputCopyEnd conn-ptr error-message))) + (cond + ((= res 0) + (thread-wait-for-i/o! conn-fd #:output) + (loop (PQputCopyEnd conn-ptr error-message))) + ((= res 1) (get-last-result conn)) + ((= res -1) + (postgresql-error + 'i/o 'put-copy-end + (conc "Error ending put COPY data: " (PQerrorMessage conn-ptr)) + conn error-message)) + (else + (postgresql-error + 'internal 'put-copy-end + (conc "Internal error! Unexpected return value: " res) conn)))))) + +(define (get-copy-data conn #!key (format 'text)) + (let ((conn-ptr (pg-connection-ptr conn))) + (let loop () + (let-location ((buf c-pointer)) + (let ((res (PQgetCopyData conn-ptr (location buf) #t))) + (cond + ((> res 0) + (let ((value (case format + ((text) (make-string res)) + ((binary) (make-blob res)) + (else (postgresql-error + 'internal 'get-copy-data + (conc "Unknown column format type: " format) + conn))))) + (memcpy value buf res) + (free buf) + value)) + ((= res 0) + (buffer-available-input! conn) + (loop)) + ((= res -1) + (get-last-result conn)) + ((= res -2) + (postgresql-error + 'i/o 'get-copy-data + (conc "Error getting COPY data: " (PQerrorMessage conn-ptr)) conn)) + (else (postgresql-error + 'internal 'get-copy-data + (conc "Internal error! Unexpected return value: " res) + conn)))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;; Value escaping and quotation +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (escape-string conn str) + (define %escape-string-conn + (foreign-lambda size_t PQescapeStringConn + pgconn* scheme-pointer scheme-pointer size_t (c-pointer bool))) + (let-location ((err bool)) + (let* ((strlen (string-length str)) + (buflen (add1 (* strlen 2))) + (buffer (make-string buflen)) + (conn-ptr (pg-connection-ptr conn)) + (size (%escape-string-conn conn-ptr buffer str strlen (location err)))) + (cond (err (postgresql-error 'internal 'escape-string + (conc "String escaping failed. " + (PQerrorMessage conn-ptr)) conn str)) + ((= size buflen) buffer) + (else (##sys#substring buffer 0 size)))))) + +(define (quote-identifier conn str) + (cond-expand + (has-escape-identifier + (define %escape-ident + (foreign-lambda c-string* PQescapeIdentifier pgconn* scheme-pointer size_t)) + (or (%escape-ident (pg-connection-ptr conn) str (string-length str)) + (postgresql-error 'internal 'quote-identifier + (conc "Identifier escaping failed: " + (PQerrorMessage (pg-connection-ptr conn))) + conn str))) + (else (postgresql-error 'unsupported-version 'quote-identifier + (conc "Please upgrade your PostgreSQL to 9.0 or later " + "in order to be able to use quote-identifier!") + conn str)))) + +(define (escape-bytea conn obj) + (define %escape-bytea-conn + (foreign-lambda (c-pointer unsigned-char) PQescapeByteaConn + pgconn* scheme-pointer size_t (c-pointer size_t))) + (let-location ((allocated size_t)) + (let* ((data (cond ((or (string? obj) (blob? obj)) obj) + ((srfi-4-vector? obj) (##sys#slot obj 1)) + (else (postgresql-error + 'type 'escape-bytea + "Expected string, blob or srfi-4 vector" obj)))) + (conn-ptr (pg-connection-ptr conn)) + (buf (%escape-bytea-conn + conn-ptr data (##sys#size data) (location allocated)))) + (if buf + (let* ((string-length (sub1 allocated)) + (result-string (make-string string-length))) + (memcpy result-string buf string-length) + (free buf) + result-string) + (postgresql-error + 'internal 'escape-bytea + (conc "Byte array escaping failed: " (PQerrorMessage conn-ptr)) + conn obj))))) + +(define (unescape-bytea str) + (define %unescape-bytea + (foreign-lambda (c-pointer unsigned-char) PQunescapeBytea c-string (c-pointer size_t))) + (let-location ((blob-length size_t)) + (let ((buf (%unescape-bytea str (location blob-length)))) + (if buf + (let ((result-blob (make-blob blob-length))) + (memcpy result-blob buf blob-length) + (free buf) + (blob->u8vector/shared result-blob)) + (postgresql-error 'internal 'unescape-bytea + "Byte array unescaping failed (out of memory?)" str))))) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;; High-level interface +;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (make-result-fold item-count sub-item-count extract-item) + (lambda (kons knil result) + (let ((items (item-count result)) + (sub-items (sub-item-count result))) + (let loop ((seed knil) + (item 0)) + (if (= item items) + seed + (loop (kons (extract-item result item sub-items #f) seed) (add1 item))))))) + +(define row-fold (make-result-fold row-count column-count row-values*)) +(define (row-fold* kons knil result) + (row-fold (lambda (values seed) + (apply kons (append values (list seed)))) knil result)) + +(define column-fold (make-result-fold column-count row-count column-values*)) +(define (column-fold* kons knil result) + (column-fold (lambda (values seed) + (apply kons (append values (list seed)))) knil result)) + + +(define (make-result-fold-right item-count sub-item-count extract-item) + (lambda (kons knil result) + (let ((sub-items (sub-item-count result))) + (let loop ((seed knil) + (item (item-count result))) + (if (= item 0) + seed + (loop (kons (extract-item result (sub1 item) sub-items #f) seed) (sub1 item))))))) + +(define row-fold-right (make-result-fold-right row-count column-count row-values*)) +(define (row-fold-right* kons knil result) + (row-fold-right (lambda (values seed) + (apply kons (append values (list seed)))) knil result)) + +(define column-fold-right (make-result-fold-right column-count row-count column-values*)) +(define (column-fold-right* kons knil result) + (column-fold-right (lambda (values seed) + (apply kons (append values (list seed)))) knil result)) + + +(define (row-for-each proc result) + (row-fold (lambda (values seed) (proc values)) #f result) + (void)) +(define (row-for-each* proc result) + (row-fold (lambda (values seed) (apply proc values)) #f result) + (void)) + +(define (column-for-each proc result) + (column-fold (lambda (values seed) (proc values)) #f result) + (void)) +(define (column-for-each* proc result) + (column-fold (lambda (values seed) (apply proc values)) #f result) + (void)) + +;; Like regular Scheme map, the order in which the procedure is applied is +;; undefined. We make good use of that by traversing the resultset from +;; the end back to the beginning, thereby avoiding a reverse! on the result. +(define (row-map proc res) + (row-fold-right (lambda (row lst) (cons (proc row) lst)) '() res)) +(define (row-map* proc res) + (row-fold-right (lambda (row lst) (cons (apply proc row) lst)) '() res)) +(define (column-map proc res) + (column-fold-right (lambda (col lst) (cons (proc col) lst)) '() res)) +(define (column-map* proc res) + (column-fold-right (lambda (col lst) (cons (apply proc col) lst)) '() res)) + +(define (result-format result) + (if (and result ((foreign-lambda bool PQbinaryTuples pgresult*) + (pg-result-ptr result))) + 'binary 'text)) + +(define (copy-query*-fold kons knil conn query + #!optional (params '()) #!key (format 'text) raw) + (let* ((result (query* conn query params format: format raw: raw)) + (data-format (result-format result))) + (handle-exceptions exn + (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let loop ((data (get-copy-data conn format: data-format)) + (seed knil)) + (if (result? data) + seed + ;; Explicit ordering; data could be _very_ big, allow one to be GCed + (let ((next (kons data seed))) + (loop (get-copy-data conn format: data-format) next))))))) + +(define (copy-query-fold kons knil conn query . params) + (copy-query*-fold kons knil conn query params)) + + +;; This is slow and memory-intensive if data is big. Provided for completeness +(define (copy-query*-fold-right kons knil conn query + #!optional (params '()) #!key (format 'text) raw) + (let* ((result (query* conn query params format: format raw: raw)) + (data-format (result-format result))) + (handle-exceptions exn + (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let loop ((data (get-copy-data conn format: data-format))) + (if (result? data) + knil + (kons data (loop (get-copy-data conn format: data-format)))))))) + +(define (copy-query-fold-right kons knil conn query . params) + (copy-query*-fold-right kons knil conn query params)) + + +(define (copy-query*-map proc conn query + #!optional (params '()) #!key (format 'text) raw) + (reverse! (copy-query*-fold (lambda (data seed) (cons (proc data) seed)) + '() conn query params format: format raw: raw))) + +(define (copy-query-map proc conn query . params) + (copy-query*-map proc conn query params)) + + +(define (copy-query*-for-each proc conn query + #!optional (params '()) #!key (format 'text) raw) + (copy-query*-fold (lambda (data seed) (proc data)) + #f conn query params format: format raw: raw) + (void)) + +(define (copy-query-for-each proc conn query . params) + (copy-query*-for-each proc conn query params)) + +;; A bit of a weird name but consistent +(define (call-with-output-copy-query* + proc conn query #!optional (params '()) #!key (format 'text) raw) + (query* conn query params format: format raw: raw) + (let* ((closed? #f) + (output-port (make-output-port + (lambda (data) (put-copy-data conn data)) + (lambda () (put-copy-end conn) (set! closed? #t))))) + (handle-exceptions exn + (if closed? + (raise exn) + (handle-exceptions _ + (raise exn) + ;; Previously written data will be discarded to guarantee atomicity + (put-copy-end conn "Chicken PostgreSQL egg -- forcing error"))) + (call-with-values (lambda () (proc output-port)) + (lambda args + (unless closed? (put-copy-end conn)) + (apply values args)))))) + +(define (call-with-output-copy-query proc conn query . params) + (call-with-output-copy-query* proc conn query params)) + +(define (with-output-to-copy-query* + thunk conn query #!optional (params '()) #!key (format 'text) raw) + (call-with-output-copy-query* (lambda (x) (with-output-to-port x thunk)) + conn query params format: format raw: raw)) + +(define (with-output-to-copy-query thunk conn query . params) + (with-output-to-copy-query* thunk conn query params)) + +) |