;;; Bindings to the PostgreSQL C library ;; ;; Copyright (C) 2008-2019 Peter Bex ;; Copyright (C) 2004 Johannes Grødem ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE ;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT ;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR ;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH ;; DAMAGE. ;; We use ##sys#thread-... directly without importing anything (declare (uses scheduler)) (module postgresql (type-parsers update-type-parsers! default-type-parsers char-parser bool-parser bytea-parser numeric-parser make-array-parser make-composite-parser scheme-value->db-value type-unparsers update-type-unparsers! default-type-unparsers bool-unparser vector-unparser list-unparser connect reset-connection disconnect connection? connected? default-notify-handler set-notify-handler! query query* with-transaction in-transaction? result? clear-result! row-count column-count column-index column-name column-names column-format column-type column-type-modifier table-oid table-column-index value-at row-values row-alist column-values affected-rows inserted-oid invalid-oid escape-string escape-bytea unescape-bytea quote-identifier put-copy-data put-copy-end get-copy-data row-fold row-fold* row-fold-right row-fold-right* row-for-each row-for-each* row-map row-map* column-fold column-fold* column-fold-right column-fold-right* column-for-each column-for-each* column-map column-map* copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map call-with-output-copy-query call-with-output-copy-query* with-output-to-copy-query with-output-to-copy-query* wait-for-notifications!) (import scheme (chicken base) (chicken foreign) (chicken string) (chicken port) (chicken memory) (chicken condition) (chicken format) (chicken gc) (chicken blob) (chicken time) srfi-1 srfi-4 srfi-13 srfi-69 sql-null) (import-for-syntax (chicken string)) (foreign-declare "#include ") (define-foreign-type pg-polling-status (enum "PostgresPollingStatusType")) (define-foreign-variable PGRES_POLLING_FAILED pg-polling-status) (define-foreign-variable PGRES_POLLING_READING pg-polling-status) (define-foreign-variable PGRES_POLLING_WRITING pg-polling-status) (define-foreign-variable PGRES_POLLING_OK pg-polling-status) (define-foreign-type pg-exec-status (enum "ExecStatusType")) (define-foreign-variable PGRES_EMPTY_QUERY pg-exec-status) (define-foreign-variable PGRES_COMMAND_OK pg-exec-status) (define-foreign-variable PGRES_TUPLES_OK pg-exec-status) (define-foreign-variable PGRES_COPY_OUT pg-exec-status) (define-foreign-variable PGRES_COPY_IN pg-exec-status) (define-foreign-variable PGRES_BAD_RESPONSE pg-exec-status) (define-foreign-variable PGRES_NONFATAL_ERROR pg-exec-status) (define-foreign-variable PGRES_FATAL_ERROR pg-exec-status) (define-foreign-type pgconn* (c-pointer "PGconn")) (define PQconnectStart (foreign-lambda pgconn* PQconnectStart (const c-string))) (define PQconnectPoll (foreign-lambda pg-polling-status PQconnectPoll pgconn*)) (define PQresetStart (foreign-lambda bool PQresetStart pgconn*)) (define PQresetPoll (foreign-lambda pg-polling-status PQresetPoll pgconn*)) (define PQfinish (foreign-lambda void PQfinish pgconn*)) (define PQstatus (foreign-lambda (enum "ConnStatusType") PQstatus (const pgconn*))) (define PQerrorMessage (foreign-lambda c-string PQerrorMessage (const pgconn*))) ;(define-foreign-type oid "Oid") (define-foreign-type oid unsigned-int) (define invalid-oid (foreign-value "InvalidOid" oid)) (define PQisBusy (foreign-lambda bool PQisBusy pgconn*)) (define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*)) (define-foreign-type pgnotify* (c-pointer "PGnotify")) (define PQnotifies (foreign-lambda pgnotify* PQnotifies pgconn*)) (define PGnotify-channel (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->relname);")) (define PGnotify-be-pid (foreign-lambda* int ((pgnotify* n)) "C_return(n->be_pid);")) (define PGnotify-payload (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->extra);")) (define-foreign-type pgresult* (c-pointer "PGresult")) (define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*)) (define PQresultStatus (foreign-lambda pg-exec-status PQresultStatus (const pgresult*))) (define PQresultErrorMessage (foreign-lambda c-string PQresultErrorMessage (const pgresult*))) (define PQresultErrorField (foreign-lambda c-string PQresultErrorField (const pgresult*) int)) (define PQclear (foreign-lambda void PQclear pgresult*)) (define PQnfields (foreign-lambda int PQnfields (const pgresult*))) (define PQntuples (foreign-lambda int PQntuples (const pgresult*))) (define PQfname (foreign-lambda c-string PQfname (const pgresult*) int)) (define PQfnumber (foreign-lambda int PQfnumber (const pgresult*) (const c-string))) (define PQftable (foreign-lambda oid PQftable (const pgresult*) int)) (define PQftablecol (foreign-lambda int PQftablecol (const pgresult*) int)) (define PQfformat (foreign-lambda int PQfformat (const pgresult*) int)) (define PQftype (foreign-lambda oid PQftype (const pgresult*) int)) (define PQfmod (foreign-lambda int PQfmod (const pgresult*) int)) (define PQgetisnull (foreign-lambda bool PQgetisnull (const pgresult*) int int)) (define PQgetlength (foreign-lambda int PQgetlength (const pgresult*) int int)) (define PQgetvalue-ptr (foreign-lambda (c-pointer char) PQgetvalue (const pgresult*) int int)) (define PQcmdTuples (foreign-lambda nonnull-c-string PQcmdTuples pgresult*)) (define PQoidValue (foreign-lambda oid PQoidValue pgresult*)) (define PQputCopyData (foreign-lambda int PQputCopyData pgconn* scheme-pointer int)) (define PQputCopyEnd (foreign-lambda int PQputCopyEnd pgconn* (const c-string))) (define PQgetCopyData (foreign-lambda int PQgetCopyData pgconn* (c-pointer (c-pointer char)) bool)) (define memcpy (foreign-lambda c-pointer "C_memcpy" scheme-pointer c-pointer size_t)) (define (srfi-4-vector? x) ; Copied from ##sys#srfi-4-vector? from 4.8.3 (and (##core#inline "C_blockp" x) (##sys#generic-structure? x) (memq (##sys#slot x 0) '(u8vector u16vector s8vector s16vector u32vector s32vector f32vector f64vector)))) ;; TODO: Create a real callback system? (foreign-declare "static void nullNoticeReceiver(void *arg, const PGresult *res){ }") (define-syntax define-foreign-int (er-macro-transformer (lambda (e r c) ;; cannot rename define-foreign-variable; it's a really special form `(define-foreign-variable ,(cadr e) int ,(conc "(int) " (cadr e)))))) (define-syntax define-pgdiag-constants (syntax-rules () ((_ (?condition ?constant0 ...) ?more ...) (begin (cond-expand (?condition (define-foreign-int ?constant0) ...) (else (define ?constant0 #f) ...)) (define-pgdiag-constants ?more ...))) ((_ ?constant0 ?more ...) (begin (define-foreign-int ?constant0) (define-pgdiag-constants ?more ...))) ((_) (void)))) (define-pgdiag-constants PG_DIAG_SEVERITY PG_DIAG_SQLSTATE PG_DIAG_CONTEXT PG_DIAG_MESSAGE_PRIMARY PG_DIAG_MESSAGE_DETAIL PG_DIAG_MESSAGE_HINT PG_DIAG_STATEMENT_POSITION PG_DIAG_SOURCE_FILE PG_DIAG_SOURCE_LINE PG_DIAG_SOURCE_FUNCTION (has-diag-query-position PG_DIAG_INTERNAL_QUERY PG_DIAG_INTERNAL_POSITION) (has-diag-schema-info PG_DIAG_SCHEMA_NAME PG_DIAG_TABLE_NAME PG_DIAG_COLUMN_NAME PG_DIAG_DATATYPE_NAME PG_DIAG_CONSTRAINT_NAME)) ;; Helper procedure for lists (TODO: use ANY instead of IN with an array?) (define (in-list len) (string-intersperse (list-tabulate len (lambda (p) (conc "$" (add1 p)))) ",")) (define (postgresql-error subtype loc message . args) (signal (apply make-pg-condition subtype loc message args))) (define (make-pg-condition subtype loc message . args) (make-composite-condition (make-property-condition 'exn 'location loc 'message message 'arguments args) (make-property-condition 'postgresql) (if (condition? subtype) subtype (make-property-condition subtype)))) ;;;;;;;;;;;;;;;;;;;;;;;; ;;;; Type parsers ;;;;;;;;;;;;;;;;;;;;;;;; (define (char-parser str) (string-ref str 0)) (define (bool-parser str) (string=? str "t")) (define (numeric-parser str) (or (string->number str) (postgresql-error 'parse 'numeric-parser "Unable to parse number" str))) (define (bytea-parser str) (unescape-bytea str)) ;; Here be dragons (define (make-array-parser element-parser #!optional (delim #\,)) (define (parse str) (if (string-ci=? "NULL" str) (sql-null) (element-parser str))) (lambda (str) (let loop ((chars (let ignore-bounds ((chars (string->list str))) (if (char=? (car chars) #\{) chars (ignore-bounds (cdr chars))))) (result (list))) (if (null? chars) (car result) ; Should contain only one vector (case (car chars) ((#\{) (receive (value rest-chars) (loop (cdr chars) (list)) (loop rest-chars (cons value result)))) ((#\}) (values (list->vector (reverse! result)) (cdr chars))) ((#\") (let consume-string ((chars (cdr chars)) (consumed (list))) (case (car chars) ((#\\) (consume-string ; Don't interpret, just add it (cddr chars) (cons (cadr chars) consumed))) ((#\") (loop (cdr chars) (cons (element-parser (reverse-list->string consumed)) result))) (else (consume-string (cdr chars) (cons (car chars) consumed)))))) ((#\tab #\newline #\space) (loop (cdr chars) result)) (else (if (char=? (car chars) delim) (loop (cdr chars) result) (let consume-string ((chars chars) (consumed (list))) (cond ((char=? (car chars) delim) (loop (cdr chars) (cons (parse (reverse-list->string consumed)) result))) ((or (char=? (car chars) #\}) (char=? (car chars) #\})) (loop chars (cons (parse (reverse-list->string consumed)) result))) (else (consume-string (cdr chars) (cons (car chars) consumed)))))))))))) (define (make-composite-parser element-parsers) (define (parse str element-parser) (if (string=? "" str) (sql-null) (element-parser str))) (lambda (str) (let loop ((chars (cdr (string->list (string-trim str)))) ; skip leading ( (maybe-null? #t) (result (list)) (parsers element-parsers)) (case (car chars) ((#\)) (reverse! (if maybe-null? (cons (sql-null) result) result))) ((#\") (let consume-string ((chars (cdr chars)) (consumed (list))) (case (car chars) ((#\\) (consume-string ; Don't interpret, just add it (cddr chars) (cons (cadr chars) consumed))) ((#\") (if (char=? #\" (cadr chars)) ; double escapes (consume-string (cddr chars) (cons #\" consumed)) (let skip-spaces ((chars (cdr chars))) (case (car chars) ((#\space #\newline #\tab) (skip-spaces (cdr chars))) ((#\,) (loop (cdr chars) #t (cons ((car parsers) (reverse-list->string consumed)) result) (cdr parsers))) ((#\)) (loop chars #f (cons ((car parsers) (reverse-list->string consumed)) result) (cdr parsers))) (else (postgresql-error 'parse 'make-composite-parser "Bogus trailing characters" str)))))) (else (consume-string (cdr chars) (cons (car chars) consumed)))))) (else (let consume-string ((chars chars) (consumed (list))) (case (car chars) ((#\,) (loop (cdr chars) #t (cons (parse (reverse-list->string consumed) (car parsers)) result) (cdr parsers))) ;; Nothing should precede this one ((#\)) (loop chars #f (cons (parse (reverse-list->string consumed) (car parsers)) result) (cdr parsers))) (else (consume-string (cdr chars) (cons (car chars) consumed)))))))))) (define default-notify-handler (make-parameter #f)) ;; Array parsers and composite parsers are automatically cached when such ;; a value is requested. (define default-type-parsers (make-parameter `(("text" . ,identity) ("bytea" . ,bytea-parser) ("char" . ,char-parser) ("bpchar" . ,identity) ("bool" . ,bool-parser) ("int8" . ,numeric-parser) ("int4" . ,numeric-parser) ("int2" . ,numeric-parser) ("float4" . ,numeric-parser) ("float8" . ,numeric-parser) ("numeric" . ,numeric-parser) ("oid" . ,numeric-parser) ;; Nasty hack, or clever hack? :) ("record" . ,(make-composite-parser (circular-list identity)))))) ;;;;;;;;;;;;;;;;;;;;;;; ;;;; Type unparsers ;;;;;;;;;;;;;;;;;;;;;;; (define (scheme-value->db-value conn value) (cond ((find (lambda (parse?) ((car parse?) value)) (pg-connection-type-unparsers conn)) => (lambda (parse) ((cdr parse) conn value))) (else value))) (define (bool-unparser conn b) (if b "TRUE" "FALSE")) (define (vector-unparser conn v) (let loop ((result (list)) (pos 0) (len (vector-length v))) (if (= pos len) (string-append "{" (string-intersperse (reverse! result) ",") "}") (let* ((value (vector-ref v pos)) (unparsed-value (scheme-value->db-value conn value)) (serialized (cond ((sql-null? unparsed-value) "NULL") ((not (string? unparsed-value)) (postgresql-error 'unparse 'vector-unparser "Param value is not string" unparsed-value)) ((vector? value) unparsed-value) ;; don't quote! (else (sprintf "\"~A\"" (string-translate* unparsed-value '(("\\" . "\\\\") ("\"" . "\\\"")))))))) (loop (cons serialized result) (add1 pos) len))))) (define (list-unparser conn l) (let loop ((result (list)) (items l)) (if (null? items) (string-append "(" (string-intersperse (reverse! result) ",") ")") (let* ((unparsed-value (scheme-value->db-value conn (car items))) (serialized (cond ((sql-null? unparsed-value) "") ((not (string? unparsed-value)) (postgresql-error 'unparse 'list-unparser "Param value is not string" unparsed-value)) (else (sprintf "\"~A\"" (string-translate* unparsed-value '(("\\" . "\\\\") ("\"" . "\\\"")))))))) (loop (cons serialized result) (cdr items)))))) (define default-type-unparsers (make-parameter `((,string? . ,(lambda (conn s) s)) (,u8vector? . ,(lambda (conn v) (u8vector->blob/shared v))) (,char? . ,(lambda (conn c) (string c))) (,boolean? . ,bool-unparser) (,number? . ,(lambda (conn n) (number->string n))) (,vector? . ,vector-unparser) (,pair? . ,list-unparser)))) ;; Retrieve type-oids from PostgreSQL: (define (update-type-parsers! conn #!optional new-type-parsers) (let ((type-parsers (or new-type-parsers (pg-connection-type-parsers conn))) (ht (make-hash-table)) (result '())) (pg-connection-oid-parsers-set! conn ht) (pg-connection-type-parsers-set! conn type-parsers) (unless (null? type-parsers) ; empty IN () clause is not allowed (row-for-each* (lambda (oid typname) (and-let* ((procedure (assoc typname type-parsers))) (hash-table-set! ht (string->number oid) (cdr procedure)))) (query* conn (sprintf "SELECT oid, typname FROM pg_type WHERE typname IN (~A)" (in-list (length type-parsers))) (map car type-parsers) raw: #t))))) (define (update-type-unparsers! conn new-type-unparsers) (pg-connection-type-unparsers-set! conn new-type-unparsers)) ;;;;;;;;;;;;;;;;;;;; ;;;; Connections ;;;;;;;;;;;;;;;;;;;; (define-record pg-connection ptr type-parsers oid-parsers type-unparsers notify-handler transaction-depth) (define connection? pg-connection?) (define type-parsers pg-connection-type-parsers) (define type-unparsers pg-connection-type-unparsers) (define notify-handler pg-connection-notify-handler) (define (set-notify-handler! conn h) (when (and h (not (procedure? h))) (make-pg-condition 'type 'set-notify-handler! "Not a procedure")) (pg-connection-notify-handler-set! conn h)) (define (connected? conn) (not (not (pg-connection-ptr conn)))) (define (pgsql-connection->fd conn) ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn))) (define (pq-thread-wait-for-i/o! conn in/out #!optional delay) (let ((conn-fd (pgsql-connection->fd conn)) (t ##sys#current-thread)) (when delay (##sys#thread-block-for-timeout! t (+ (current-milliseconds) delay))) (##sys#thread-block-for-i/o! t conn-fd in/out) (##sys#thread-yield!))) (define (wait-for-connection! conn poll-function) (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((result (poll-function conn-ptr))) (cond ((= result PGRES_POLLING_OK) (void)) ((= result PGRES_POLLING_FAILED) (let ((message (PQerrorMessage conn-ptr))) (disconnect conn) (postgresql-error 'connect 'connect (conc "Polling Postgres database failed: " message) conn))) ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING)) (pq-thread-wait-for-i/o! conn (if (= PGRES_POLLING_READING result) #:input #:output)) (loop (poll-function conn-ptr))) (else (postgresql-error 'internal 'connect (conc "Internal error! Unknown status code: " result) conn)))))) (cond-expand ((not has-connectdb-params) (define (alist->connection-spec alist) (string-join (map (lambda (subspec) (sprintf "~A='~A'" (car subspec) ;; this had better not contain [ =\'] (string-translate* (->string (cdr subspec)) '(("\\" . "\\\\") ("'" . "\\'"))))) alist)))) (else)) (define (connect-start spec) (if (string? spec) (PQconnectStart spec) (cond-expand (has-connectdb-params (let ((len (length spec))) ((foreign-lambda* pgconn* ((scheme-object cons) (scheme-pointer keybuf) (scheme-pointer valbuf) (int len)) "const char **key = (const char **)keybuf;" "const char **val = (const char **)valbuf;" "int i;" "for (i=0; i < len; ++i,cons=C_u_i_cdr(cons)) {" " C_word kvpair = C_u_i_car(cons);" " key[i] = C_c_string(C_u_i_car(kvpair));" " val[i] = C_c_string(C_u_i_cdr(kvpair));" "}" "key[len] = NULL;" "val[len] = NULL;" "C_return(PQconnectStartParams(key, val, 0));") (map (lambda (x) (cons (string-append (->string (car x)) "\x00") (string-append (->string (cdr x)) "\x00"))) spec) (make-blob (* (add1 len) (foreign-value "sizeof(char *)" int))) (make-blob (* (add1 len) (foreign-value "sizeof(char *)" int))) len))) (else (PQconnectStart (alist->connection-spec spec)))))) (define (connect #!optional (conn-spec '()) (type-parsers (default-type-parsers)) (type-unparsers (default-type-unparsers)) (notify-handler (default-notify-handler))) (let ((conn-ptr (connect-start conn-spec))) (cond ((not conn-ptr) (postgresql-error 'internal 'connect "Unable to allocate a Postgres connection structure" conn-spec)) ((= (foreign-value "CONNECTION_BAD" int) (PQstatus conn-ptr)) (let ((message (PQerrorMessage conn-ptr))) (PQfinish conn-ptr) (postgresql-error 'connect 'connect (conc "Connection to Postgres database failed: " message) conn-spec))) (else (let ((conn (make-pg-connection conn-ptr type-parsers (make-hash-table) type-unparsers notify-handler 0))) ;; We don't want libpq to piss in our stderr stream ((foreign-lambda* void ((pgconn* conn)) "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr) (wait-for-connection! conn PQconnectPoll) (set-finalizer! conn disconnect) ;; Retrieve type-information from PostgreSQL metadata for use by ;; the various value-parsers. (update-type-parsers! conn) conn))))) (define (reset-connection connection) (let ((conn-ptr (pg-connection-ptr connection))) (if (PQresetStart conn-ptr) ;; Update oid-parsers? (wait-for-connection! connection PQresetPoll) (let ((error-message (PQerrorMessage conn-ptr))) (disconnect connection) (postgresql-error 'connect 'reset-connection (conc "Reset of connection failed: " error-message) connection))))) (define (disconnect connection) (and-let* ((conn-ptr (pg-connection-ptr connection))) (pg-connection-ptr-set! connection #f) (pg-connection-type-parsers-set! connection #f) (pg-connection-oid-parsers-set! connection #f) (PQfinish conn-ptr)) (void)) ;;;;;;;;;;;;;;; ;;;; Results ;;;;;;;;;;;;;;; (define-record pg-result ptr value-parsers) (define result? pg-result?) (define (clear-result! result) (and-let* ((result-ptr (pg-result-ptr result))) (pg-result-ptr-set! result #f) (PQclear result-ptr))) (define (row-count result) (PQntuples (pg-result-ptr result))) (define (column-count result) (PQnfields (pg-result-ptr result))) ;; Helper procedures for bounds checking; so we can distinguish between ;; out of bounds and nonexistant columns, and signal it. (define (check-column-index! result index location) (when (>= index (column-count result)) (postgresql-error 'bounds location (sprintf "Result column ~A out of bounds" index) result index))) (define (check-row-index! result index location) (when (>= index (row-count result)) (postgresql-error 'bounds location (sprintf "Result row ~A out of bounds" index) result index))) (define (column-name result index) (check-column-index! result index 'column-name) (string->symbol (PQfname (pg-result-ptr result) index))) (define (column-names result) (let ((ptr (pg-result-ptr result))) (let loop ((names '()) (column (column-count result))) (if (= column 0) names (loop (cons (string->symbol (PQfname ptr (sub1 column))) names) (sub1 column)))))) (define (column-index result name) (let ((idx (PQfnumber (pg-result-ptr result) (symbol->string name)))) (and (>= idx 0) idx))) (define (table-oid result index) (check-column-index! result index 'table-oid) (let ((oid (PQftable (pg-result-ptr result) index))) (and (not (= oid invalid-oid)) oid))) ;; Fixes the off-by-1 unexpectedness in libpq/the protocol to make it more ;; consistent with the rest of Scheme. However, this is inconsistent with ;; almost all other PostgreSQL interfaces... (define (table-column-index result index) (check-column-index! result index 'table-column-index) (let ((idx (PQftablecol (pg-result-ptr result) index))) (and (> idx 0) (sub1 idx)))) (define format-table '((0 . text) (1 . binary))) (define (format->symbol format) (or (alist-ref format format-table eq?) (postgresql-error 'type 'format->symbol "Unknown format" format))) (define (symbol->format symbol) (or (and-let* ((res (rassoc symbol format-table eq?))) (car res)) (postgresql-error 'type 'format->symbol "Unknown format" symbol))) (define (column-format result index) (check-column-index! result index 'column-format) (format->symbol (PQfformat (pg-result-ptr result) index))) (define (column-type result index) (check-column-index! result index 'column-type) (PQftype (pg-result-ptr result) index)) ;; This is really not super-useful as it requires intimate knowledge ;; about the internal implementations of types in PostgreSQL. (define (column-type-modifier result index) (check-column-index! result index 'column-type) (let ((mod (PQfmod (pg-result-ptr result) index))) (and (>= mod 0) mod))) ;; Unchecked version, for speed (define (value-at* result column row raw) (let ((ptr (pg-result-ptr result))) (if (PQgetisnull ptr row column) (sql-null) (let* ((len (PQgetlength ptr row column)) (fmt (PQfformat ptr column)) (value (case fmt ((0) (make-string len)) ((1) (make-blob len)) (else (postgresql-error 'internal 'value-at (conc "Unknown column format type: " fmt) result column row raw))))) (memcpy value (PQgetvalue-ptr ptr row column) len) (if (or raw (blob? value)) value ((vector-ref (pg-result-value-parsers result) column) value)))))) (define (value-at result #!optional (column 0) (row 0) #!key raw) (check-row-index! result row 'value) (check-column-index! result column 'value) (value-at* result column row raw)) (define (row-values* result row column-count raw) (let loop ((list '()) (column column-count)) (if (= column 0) list (loop (cons (value-at* result (sub1 column) row raw) list) (sub1 column))))) (define (row-values result #!optional (row 0) #!key raw) (check-row-index! result row 'row) (row-values* result row (column-count result) raw)) (define (column-values* result column row-count raw) (let loop ((list '()) (row row-count)) (if (= row 0) list (loop (cons (value-at* result column (sub1 row) raw) list) (sub1 row))))) (define (column-values result #!optional (column 0) #!key raw) (check-column-index! result column 'column) (column-values* result column (row-count result) raw)) ;; (define (row-alist result #!optional (row 0) #!key raw) ;; (map cons (column-names result) (row-values result row raw: raw))) (define (row-alist result #!optional (row 0) #!key raw) (check-row-index! result row 'row-alist) (let loop ((alist '()) (column (column-count result))) (if (= column 0) alist (loop (cons (cons (string->symbol (PQfname (pg-result-ptr result) (sub1 column))) (value-at* result (sub1 column) row raw)) alist) (sub1 column))))) ;;; TODO: Do we want/need PQnparams and PQparamtype bindings? (define (affected-rows result) (string->number (PQcmdTuples (pg-result-ptr result)))) (define (inserted-oid result) (let ((oid (PQoidValue (pg-result-ptr result)))) (and (not (= oid invalid-oid)) oid))) ;;;;;;;;;;;;;;;;;;;;;;;; ;;;; Query procedures ;;;;;;;;;;;;;;;;;;;;;;;; (define (wait-for-notifications! conn #!optional delay) (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((seen-notifies #f) (waited #f)) (if (PQconsumeInput conn-ptr) (cond ((handle-notifies! conn) (loop #t waited)) ((and (not seen-notifies) (not waited)) (pq-thread-wait-for-i/o! conn #:input delay) (loop seen-notifies #t))) (postgresql-error 'i/o 'wait-for-notifications! (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) conn))))) ;; Buffer all available input, yielding if nothing is available: (define (buffer-available-input! conn) (let ((conn-ptr (pg-connection-ptr conn))) (let loop () (if (PQconsumeInput conn-ptr) (cond ((handle-notifies! conn) (loop)) ((PQisBusy conn-ptr) (pq-thread-wait-for-i/o! conn #:input) (loop))) (postgresql-error 'i/o 'buffer-available-input! (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) conn))))) (define (handle-notifies! conn) (let ((conn-ptr (pg-connection-ptr conn)) (handler (pg-connection-notify-handler conn))) (let lp ((notify (PQnotifies conn-ptr)) (seen-notifications? #f)) (cond (notify (let ((channel (PGnotify-channel notify)) (pid (PGnotify-be-pid notify)) (payload (PGnotify-payload notify))) (free notify) (and handler (handler channel pid payload)) (lp (PQnotifies conn-ptr) #t))) (else seen-notifications?))))) ;; Here be more dragons (define (resolve-unknown-types! conn oids) (unless (null? oids) (let* ((parsers (pg-connection-oid-parsers conn)) (q (conc "SELECT t.oid, t.typtype, t.typelem, t.typdelim, " " t.typbasetype, t.typarray, a.attrelid, a.atttypid " "FROM pg_type t " " LEFT JOIN pg_attribute a " " ON t.typrelid = a.attrelid AND a.attnum > 0 " "WHERE t.oid IN (~A) " "ORDER BY COALESCE(t.typrelid,-1) ASC, a.attnum ASC")) (result (query* conn (sprintf q (in-list (length oids))) (map number->string oids) raw: #t)) (count (row-count result))) (let dissect-types ((unknown-oids (list)) (pos 0) (domains (list)) (arrays (list)) (classes (list)) (last-class 0)) (cond ((>= pos count) ; Done scanning rows? ;; Keep going until all oids are resolved (resolve-unknown-types! conn unknown-oids) ;; Postprocessing step: resolve all nested types (for-each (lambda (d) (and-let* ((p (hash-table-ref/default parsers (cdr d) #f))) (hash-table-set! parsers (car d) p))) domains) (for-each (lambda (a) (and-let* ((p (hash-table-ref/default parsers (cddr a) #f))) (hash-table-set! parsers (car a) (make-array-parser p (cadr a))))) arrays) (for-each (lambda (c) (and-let* ((p-list (fold (lambda (att l) (and-let* ((l) (p (hash-table-ref/default parsers att #f))) (cons p l))) '() (cdr c)))) (hash-table-set! parsers (car c) (make-composite-parser p-list)))) classes)) ((not (string=? (value-at* result 4 pos #f) "0")) ; Domain type? (let* ((basetype-oid (string->number (value-at* result 4 pos #f))) (parser (hash-table-ref/default parsers basetype-oid #f)) (oid (string->number (value-at* result 0 pos #f)))) (dissect-types (if parser unknown-oids (cons basetype-oid unknown-oids)) (add1 pos) (cons (cons oid basetype-oid) domains) arrays classes last-class))) ((string=? (value-at* result 5 pos #f) "0") ; Array value? (let* ((elem (string->number (value-at* result 2 pos #f))) (delim (string-ref (value-at* result 3 pos #f) 0)) (parser (hash-table-ref/default parsers elem #f)) (oid (string->number (value-at* result 0 pos #f)))) (dissect-types (if parser unknown-oids (cons elem unknown-oids)) (add1 pos) domains (cons (cons oid (cons delim elem)) arrays) classes last-class))) ((string=? (value-at* result 1 pos #f) "c") ; Class? (i.e., table or type) (let* ((classid (string->number (value-at* result 6 pos #f))) (attrid (string->number (value-at* result 7 pos #f))) (parser (hash-table-ref/default parsers attrid #f))) (dissect-types (if parser unknown-oids (cons attrid unknown-oids)) (add1 pos) domains arrays ;; Keep oid at the front of the list, insert this ;; attr after it, before the other attrs, if any. (if (= last-class classid) (cons (cons (caar classes) (cons attrid (cdar classes))) (cdr classes)) (cons (cons (string->number (value-at* result 0 pos #f)) (list attrid)) classes)) classid))) (else (dissect-types unknown-oids (add1 pos) domains arrays classes last-class))))))) (define (make-value-parsers conn pqresult #!key raw) (let* ((nfields (PQnfields pqresult)) (parsers (make-vector nfields)) (ht (pg-connection-oid-parsers conn))) (let loop ((col 0) (unknowns (list))) (if (= col nfields) (begin (resolve-unknown-types! conn (map cdr unknowns)) (for-each (lambda (unknown) (let* ((col (car unknown)) (oid (cdr unknown)) (parser (hash-table-ref/default ht oid identity))) (vector-set! parsers col parser))) unknowns) parsers) (let* ((oid (PQftype pqresult col)) (parser (if raw identity (hash-table-ref/default ht oid #f)))) (vector-set! parsers col parser) (loop (add1 col) (if parser unknowns (cons (cons col oid) unknowns)))))))) ;; Collect the result pointers from the last query. ;; ;; A pgresult represents an entire resultset and is always read into memory ;; all at once. (define (get-last-result conn #!key raw) (buffer-available-input! conn) (let* ((conn-ptr (pg-connection-ptr conn)) ;; Read out all remaining results (including the current one). ;; TODO: Is this really needed? libpq does it (in pqExecFinish), ;; but ostensibly only to concatenate the error messages for ;; each query. OTOH, maybe we want to do that, too. (clean-results! (lambda (result) (let loop ((result result)) (when result (PQclear result) (loop (PQgetResult conn-ptr)))))) (result (PQgetResult conn-ptr)) (status (PQresultStatus result))) (cond ((not result) (postgresql-error 'internal 'get-last-result "Internal error! No result object available from server" conn)) ((member status (list PGRES_BAD_RESPONSE PGRES_FATAL_ERROR PGRES_NONFATAL_ERROR)) (let* ((error-field (lambda (f) (and f (PQresultErrorField result f)))) (error-field/int (lambda (f) (and-let* ((value (error-field f))) (string->number value)))) (sqlstate (error-field PG_DIAG_SQLSTATE)) (maybe-severity (error-field PG_DIAG_SEVERITY)) (condition (make-pg-condition (make-property-condition 'query 'severity (and maybe-severity (string->symbol (string-downcase maybe-severity))) 'error-class (and sqlstate (string-take sqlstate 2)) 'error-code sqlstate 'message-primary (error-field PG_DIAG_MESSAGE_PRIMARY) 'message-detail (error-field PG_DIAG_MESSAGE_DETAIL) 'message-hint (error-field PG_DIAG_MESSAGE_HINT) 'statement-position (error-field/int PG_DIAG_STATEMENT_POSITION) 'context (error-field PG_DIAG_CONTEXT) 'source-file (error-field PG_DIAG_SOURCE_FILE) 'source-line (error-field/int PG_DIAG_SOURCE_LINE) 'source-function (error-field PG_DIAG_SOURCE_FUNCTION) 'internal-query (error-field PG_DIAG_INTERNAL_QUERY) 'internal-position (error-field/int PG_DIAG_INTERNAL_POSITION) 'schema-name (error-field PG_DIAG_SCHEMA_NAME) 'table-name (error-field PG_DIAG_TABLE_NAME) 'column-name (error-field PG_DIAG_COLUMN_NAME) 'datatype-name (error-field PG_DIAG_DATATYPE_NAME) 'constraint-name (error-field PG_DIAG_CONSTRAINT_NAME)) 'get-last-result (PQresultErrorMessage result)))) (clean-results! result) (signal condition))) ((member status (list PGRES_COPY_OUT PGRES_COPY_IN)) ;; These are weird; A COPY puts the connection in "copy mode". ;; As long as it's in "copy mode", pqgetresult will return the ;; same result every time you call it, so don't try to call ;; clean-results! (let ((result-obj (make-pg-result result #f))) (set-finalizer! result-obj clear-result!) result-obj)) ((member status (list PGRES_EMPTY_QUERY PGRES_COMMAND_OK PGRES_TUPLES_OK)) (let ((result-obj (make-pg-result result #f))) (set-finalizer! result-obj clear-result!) (let ((trailing-result (PQgetResult conn-ptr))) (when trailing-result (clean-results! trailing-result) (postgresql-error 'internal 'get-last-result (conc "Internal error! Unexpected extra " "results after first query result") conn))) (pg-result-value-parsers-set! result-obj (make-value-parsers conn result raw: raw)) result-obj)) (else (postgresql-error 'internal 'get-last-result (conc "Internal error! Unknown status code: " status) conn))))) (define (query conn query . params) (query* conn query params)) (define (query* conn query #!optional (params '()) #!key (format 'text) raw) (let* ((params ;; Check all params and ensure they are proper pairs (map (lambda (p) (let ((obj (if raw p (scheme-value->db-value conn p)))) (cond ((string? obj) ; Convert to ASCIIZ (let* ((len (##sys#size obj)) (res (string-append obj "\x00"))) (vector 0 len res))) ((blob? obj) (vector 1 (##sys#size obj) obj)) ((sql-null? obj) #f) (else (postgresql-error 'type 'query* (sprintf "Param value is not string, sql-null or blob: ~S" p) conn query params format))))) params)) ;; It's a shame we need to traverse params twice (and again in C)... (len (length params)) (send-query (foreign-lambda* bool ((pgconn* conn) (nonnull-c-string query) (bool is_prepped) (int num) (scheme-object params) (scheme-pointer valsbuf) (scheme-pointer lensbuf) (scheme-pointer fmtsbuf) (int rfmt)) "int i = 0, *lens = (int *)lensbuf, *fmts = (int *)fmtsbuf;" "const char **vals = (const char **)valsbuf;" "C_word obj, cons;" "for (i=0,cons=params; i < num; ++i,cons=C_u_i_cdr(cons)) {" " obj = C_u_i_car(cons);" " if (obj == C_SCHEME_FALSE) {" " fmts[i] = lens[i] = 0;" " vals[i] = NULL;" " } else {" " fmts[i] = C_unfix(C_block_item(obj, 0));" " lens[i] = C_unfix(C_block_item(obj, 1));" " vals[i] = C_c_string(C_block_item(obj, 2));" " }" "}" "if (is_prepped)" " C_return(PQsendQueryPrepared((PGconn *)conn, query, num," " vals, lens, fmts, rfmt));" "else" " C_return(PQsendQueryParams((PGconn *)conn, query, num, NULL," " vals, lens, fmts, rfmt));")) (query-as-string (if (symbol? query) (symbol->string query) query))) ;; XXX: What if we're using a newer protocol version? Then this error is ;; well-meaning but completely wrong... Unfortunately the error message ;; returned by the server if we exceed this limit is even more confusing. (cond ((> len 65535) (postgresql-error 'domain 'query* (sprintf "Too many bind parameters (PG protocol supports up to 65535, but got ~A). Try using the COPY support, or escaping the data and sending it as a big string." len) conn query params format)) ((send-query (pg-connection-ptr conn) query-as-string (symbol? query) len params ;; We allocate here instead of in C to keep things simple (make-blob (* len (foreign-value "sizeof(char *)" int))) (make-blob (* len (foreign-value "sizeof(int)" int))) (make-blob (* len (foreign-value "sizeof(int)" int))) (symbol->format format)) (get-last-result conn raw: raw)) (else (postgresql-error 'i/o 'query* (conc "Unable to send query to server: " (PQerrorMessage (pg-connection-ptr conn))) conn query params format))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;; Transaction management ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (with-transaction conn thunk #!key isolation access) (let* ((old-depth (pg-connection-transaction-depth conn)) (isolation (and isolation (case isolation ((read-committed) "ISOLATION LEVEL READ COMMITTED") ((serializable) "ISOLATION LEVEL SERIALIZABLE") (else (postgresql-error 'type 'with-transaction "Unknown isolation level" isolation))))) (access (and access (case access ((read/write) "READ WRITE") ((read-only) "READ ONLY") (else (postgresql-error 'type 'with-transaction "Unknown access mode" access))))) (characteristics (conc (or isolation "") " " (or access ""))) (rollback! (lambda () (if (zero? old-depth) (query conn "ROLLBACK") ;; We do not *need* to give savepoints unique names, ;; but it aids debugging and we know the depth anyway. (query conn (conc "ROLLBACK TO SAVEPOINT s_" old-depth))))) (commit! (lambda () (if (zero? old-depth) (query conn "COMMIT") (query conn (conc "RELEASE SAVEPOINT s_" old-depth)))))) (when (and isolation (not (zero? old-depth))) (postgresql-error 'domain 'with-transaction "Can't set isolation level in nested transactions" isolation)) (if (zero? old-depth) (query conn (conc "BEGIN " characteristics)) (begin (query conn (conc "SAVEPOINT s_" old-depth)) ;; XXX: This should probably be SET LOCAL instead of SET ;; (which is implicitly the same as SET SESSION), but I ;; can't come up with a testcase that fails with this and ;; succeeds with SET LOCAL, so keep it around for now. (when access (query conn (conc "SET TRANSACTION " characteristics))))) (pg-connection-transaction-depth-set! conn (add1 old-depth)) ;; TODO: Add a warning mechanism (using dynamic-wind) for when the ;; user tries to jump into/out of transactions with continuations? (handle-exceptions exn (begin (pg-connection-transaction-depth-set! conn old-depth) (rollback!) (signal exn)) (let ((res (thunk))) (pg-connection-transaction-depth-set! conn old-depth) (if res (commit!) (rollback!)) res)))) (define (in-transaction? conn) (> (pg-connection-transaction-depth conn) 0)) ;;;;;;;;;;;;;;;;;;;; ;;;; COPY support ;;;;;;;;;;;;;;;;;;;; (define (put-copy-data conn data) (let* ((data (cond ((or (string? data) (blob? data)) data) ((srfi-4-vector? data) (##sys#slot data 1)) (else (postgresql-error 'type 'put-copy-data "Expected a blob, string or srfi-4 vector" conn data)))) (len (##sys#size data)) (conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyData conn-ptr data len))) (cond ((= res 0) (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyData conn-ptr data len))) ((= res 1) (void)) ((= res -1) (postgresql-error 'i/o 'put-copy-data (conc "Error putting COPY data: " (PQerrorMessage conn-ptr)) conn)) (else (postgresql-error 'internal 'put-copy-data (conc "Internal error! Unexpected return value: " res) conn)))))) (define (put-copy-end conn #!optional error-message) (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyEnd conn-ptr error-message))) (cond ((= res 0) (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyEnd conn-ptr error-message))) ((= res 1) (get-last-result conn)) ((= res -1) (postgresql-error 'i/o 'put-copy-end (conc "Error ending put COPY data: " (PQerrorMessage conn-ptr)) conn error-message)) (else (postgresql-error 'internal 'put-copy-end (conc "Internal error! Unexpected return value: " res) conn)))))) (define (get-copy-data conn #!key (format 'text)) (let ((conn-ptr (pg-connection-ptr conn))) (let loop () (let-location ((buf c-pointer)) (let ((res (PQgetCopyData conn-ptr (location buf) #t))) (cond ((> res 0) (let ((value (case format ((text) (make-string res)) ((binary) (make-blob res)) (else (postgresql-error 'internal 'get-copy-data (conc "Unknown column format type: " format) conn))))) (memcpy value buf res) (free buf) value)) ((= res 0) (buffer-available-input! conn) (loop)) ((= res -1) (get-last-result conn)) ((= res -2) (postgresql-error 'i/o 'get-copy-data (conc "Error getting COPY data: " (PQerrorMessage conn-ptr)) conn)) (else (postgresql-error 'internal 'get-copy-data (conc "Internal error! Unexpected return value: " res) conn)))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;; Value escaping and quotation ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (escape-string conn str) (define %escape-string-conn (foreign-lambda size_t PQescapeStringConn pgconn* scheme-pointer scheme-pointer size_t (c-pointer bool))) (let-location ((err bool)) (let* ((strlen (string-length str)) (buflen (add1 (* strlen 2))) (buffer (make-string buflen)) (conn-ptr (pg-connection-ptr conn)) (size (%escape-string-conn conn-ptr buffer str strlen (location err)))) (cond (err (postgresql-error 'internal 'escape-string (conc "String escaping failed. " (PQerrorMessage conn-ptr)) conn str)) ((= size buflen) buffer) (else (##sys#substring buffer 0 size)))))) (define (quote-identifier conn str) (cond-expand (has-escape-identifier (define %escape-ident (foreign-lambda c-string* PQescapeIdentifier pgconn* scheme-pointer size_t)) (or (%escape-ident (pg-connection-ptr conn) str (string-length str)) (postgresql-error 'internal 'quote-identifier (conc "Identifier escaping failed: " (PQerrorMessage (pg-connection-ptr conn))) conn str))) (else (postgresql-error 'unsupported-version 'quote-identifier (conc "Please upgrade your PostgreSQL to 9.0 or later " "in order to be able to use quote-identifier!") conn str)))) (define (escape-bytea conn obj) (define %escape-bytea-conn (foreign-lambda (c-pointer unsigned-char) PQescapeByteaConn pgconn* scheme-pointer size_t (c-pointer size_t))) (let-location ((allocated size_t)) (let* ((data (cond ((or (string? obj) (blob? obj)) obj) ((srfi-4-vector? obj) (##sys#slot obj 1)) (else (postgresql-error 'type 'escape-bytea "Expected string, blob or srfi-4 vector" obj)))) (conn-ptr (pg-connection-ptr conn)) (buf (%escape-bytea-conn conn-ptr data (##sys#size data) (location allocated)))) (if buf (let* ((string-length (sub1 allocated)) (result-string (make-string string-length))) (memcpy result-string buf string-length) (free buf) result-string) (postgresql-error 'internal 'escape-bytea (conc "Byte array escaping failed: " (PQerrorMessage conn-ptr)) conn obj))))) (define (unescape-bytea str) (define %unescape-bytea (foreign-lambda (c-pointer unsigned-char) PQunescapeBytea c-string (c-pointer size_t))) (let-location ((blob-length size_t)) (let ((buf (%unescape-bytea str (location blob-length)))) (if buf (let ((result-blob (make-blob blob-length))) (memcpy result-blob buf blob-length) (free buf) (blob->u8vector/shared result-blob)) (postgresql-error 'internal 'unescape-bytea "Byte array unescaping failed (out of memory?)" str))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;; High-level interface ;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (make-result-fold item-count sub-item-count extract-item) (lambda (kons knil result) (let ((items (item-count result)) (sub-items (sub-item-count result))) (let loop ((seed knil) (item 0)) (if (= item items) seed (loop (kons (extract-item result item sub-items #f) seed) (add1 item))))))) (define row-fold (make-result-fold row-count column-count row-values*)) (define (row-fold* kons knil result) (row-fold (lambda (values seed) (apply kons (append values (list seed)))) knil result)) (define column-fold (make-result-fold column-count row-count column-values*)) (define (column-fold* kons knil result) (column-fold (lambda (values seed) (apply kons (append values (list seed)))) knil result)) (define (make-result-fold-right item-count sub-item-count extract-item) (lambda (kons knil result) (let ((sub-items (sub-item-count result))) (let loop ((seed knil) (item (item-count result))) (if (= item 0) seed (loop (kons (extract-item result (sub1 item) sub-items #f) seed) (sub1 item))))))) (define row-fold-right (make-result-fold-right row-count column-count row-values*)) (define (row-fold-right* kons knil result) (row-fold-right (lambda (values seed) (apply kons (append values (list seed)))) knil result)) (define column-fold-right (make-result-fold-right column-count row-count column-values*)) (define (column-fold-right* kons knil result) (column-fold-right (lambda (values seed) (apply kons (append values (list seed)))) knil result)) (define (row-for-each proc result) (row-fold (lambda (values seed) (proc values)) #f result) (void)) (define (row-for-each* proc result) (row-fold (lambda (values seed) (apply proc values)) #f result) (void)) (define (column-for-each proc result) (column-fold (lambda (values seed) (proc values)) #f result) (void)) (define (column-for-each* proc result) (column-fold (lambda (values seed) (apply proc values)) #f result) (void)) ;; Like regular Scheme map, the order in which the procedure is applied is ;; undefined. We make good use of that by traversing the resultset from ;; the end back to the beginning, thereby avoiding a reverse! on the result. (define (row-map proc res) (row-fold-right (lambda (row lst) (cons (proc row) lst)) '() res)) (define (row-map* proc res) (row-fold-right (lambda (row lst) (cons (apply proc row) lst)) '() res)) (define (column-map proc res) (column-fold-right (lambda (col lst) (cons (proc col) lst)) '() res)) (define (column-map* proc res) (column-fold-right (lambda (col lst) (cons (apply proc col) lst)) '() res)) (define (result-format result) (if (and result ((foreign-lambda bool PQbinaryTuples pgresult*) (pg-result-ptr result))) 'binary 'text)) (define (copy-query*-fold kons knil conn query #!optional (params '()) #!key (format 'text) raw) (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format)) (seed knil)) (if (result? data) seed ;; Explicit ordering; data could be _very_ big, allow one to be GCed (let ((next (kons data seed))) (loop (get-copy-data conn format: data-format) next))))))) (define (copy-query-fold kons knil conn query . params) (copy-query*-fold kons knil conn query params)) ;; This is slow and memory-intensive if data is big. Provided for completeness (define (copy-query*-fold-right kons knil conn query #!optional (params '()) #!key (format 'text) raw) (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format))) (if (result? data) knil (kons data (loop (get-copy-data conn format: data-format)))))))) (define (copy-query-fold-right kons knil conn query . params) (copy-query*-fold-right kons knil conn query params)) (define (copy-query*-map proc conn query #!optional (params '()) #!key (format 'text) raw) (reverse! (copy-query*-fold (lambda (data seed) (cons (proc data) seed)) '() conn query params format: format raw: raw))) (define (copy-query-map proc conn query . params) (copy-query*-map proc conn query params)) (define (copy-query*-for-each proc conn query #!optional (params '()) #!key (format 'text) raw) (copy-query*-fold (lambda (data seed) (proc data)) #f conn query params format: format raw: raw) (void)) (define (copy-query-for-each proc conn query . params) (copy-query*-for-each proc conn query params)) ;; A bit of a weird name but consistent (define (call-with-output-copy-query* proc conn query #!optional (params '()) #!key (format 'text) raw) (query* conn query params format: format raw: raw) (let* ((closed? #f) (output-port (make-output-port (lambda (data) (put-copy-data conn data)) (lambda () (put-copy-end conn) (set! closed? #t))))) (handle-exceptions exn (if closed? (signal exn) (handle-exceptions _ (signal exn) ;; Previously written data will be discarded to guarantee atomicity (put-copy-end conn "Chicken PostgreSQL egg -- forcing error"))) (call-with-values (lambda () (proc output-port)) (lambda args (unless closed? (put-copy-end conn)) (apply values args)))))) (define (call-with-output-copy-query proc conn query . params) (call-with-output-copy-query* proc conn query params)) (define (with-output-to-copy-query* thunk conn query #!optional (params '()) #!key (format 'text) raw) (call-with-output-copy-query* (lambda (x) (with-output-to-port x thunk)) conn query params format: format raw: raw)) (define (with-output-to-copy-query thunk conn query . params) (with-output-to-copy-query* thunk conn query params)) )