diff options
| -rw-r--r-- | TODO | 2 | ||||
| -rw-r--r-- | postgresql.egg | 6 | ||||
| -rw-r--r-- | postgresql.scm | 114 | ||||
| -rw-r--r-- | tests/run.scm | 78 | 
4 files changed, 164 insertions, 36 deletions
| @@ -1,6 +1,6 @@  - Add Batch (pipeline) API support (http://blog.2ndquadrant.com/postgresql-latency-pipelining-batching/) -- Add LISTEN/NOTIFY support  - Add support for range types +- Use PQsetnonblocking on the connection to ensure large sends are nonblocking too  - Add large object support  - Add namespace support for types?  - Add support for multiple hosts as list in connection args? (new in PG 10) diff --git a/postgresql.egg b/postgresql.egg index 3848b3e..2155a0a 100644 --- a/postgresql.egg +++ b/postgresql.egg @@ -2,12 +2,12 @@  ((synopsis "Bindings for PostgreSQL's C-api")   (category db) - (version 4.0.0) + (version 4.1.0)   (author "Johannes Groedem")   (maintainer "Peter Bex")   (license "BSD") - (dependencies sql-null srfi-1 srfi-18 srfi-13 srfi-69) - (test-dependencies test) + (dependencies sql-null srfi-1 srfi-13 srfi-69) + (test-dependencies test srfi-18)   (foreign-dependencies libpq) ;; Or libpq-dev?  Highly OS-dependent!   (components (extension postgresql                          (custom-build "build-postgresql") diff --git a/postgresql.scm b/postgresql.scm index cb2ba6a..235856c 100644 --- a/postgresql.scm +++ b/postgresql.scm @@ -1,6 +1,6 @@  ;;; Bindings to the PostgreSQL C library  ;; -;; Copyright (C) 2008-2014 Peter Bex +;; Copyright (C) 2008-2019 Peter Bex  ;; Copyright (C) 2004 Johannes Grødem <johs@copyleft.no>  ;; Redistribution and use in source and binary forms, with or without  ;; modification, is permitted. @@ -26,6 +26,7 @@    default-type-unparsers bool-unparser vector-unparser list-unparser    connect reset-connection disconnect connection? connected? +  set-notify-handler!    query query* with-transaction in-transaction? @@ -47,7 +48,8 @@    copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right    copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map    call-with-output-copy-query call-with-output-copy-query* -  with-output-to-copy-query with-output-to-copy-query*) +  with-output-to-copy-query with-output-to-copy-query* +  wait-for-notifications!)   (import scheme           (chicken base) @@ -59,10 +61,10 @@           (chicken format)           (chicken gc)           (chicken blob) +         (chicken time)           srfi-1           srfi-4           srfi-13 -         srfi-18           srfi-69           sql-null) @@ -104,6 +106,15 @@  (define PQisBusy (foreign-lambda bool PQisBusy pgconn*))  (define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*)) +(define-foreign-type pgnotify* (c-pointer "PGnotify")) +(define PQnotifies (foreign-lambda pgnotify* PQnotifies pgconn*)) +(define PGnotify-channel +  (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->relname);")) +(define PGnotify-be-pid +  (foreign-lambda* int ((pgnotify* n)) "C_return(n->be_pid);")) +(define PGnotify-payload +  (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->extra);")) +  (define-foreign-type pgresult* (c-pointer "PGresult"))  (define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*)) @@ -315,6 +326,8 @@                    (else (consume-string (cdr chars)                                          (cons (car chars) consumed)))))))))) +(define default-notify-handler (make-parameter #f)) +  ;; Array parsers and composite parsers are automatically cached when such  ;; a value is requested.  (define default-type-parsers @@ -426,19 +439,32 @@  ;;;;;;;;;;;;;;;;;;;;  (define-record pg-connection -  ptr type-parsers oid-parsers type-unparsers transaction-depth) +  ptr type-parsers oid-parsers type-unparsers notify-handler +  transaction-depth)  (define connection? pg-connection?)  (define type-parsers pg-connection-type-parsers)  (define type-unparsers pg-connection-type-unparsers) +(define notify-handler pg-connection-notify-handler) +(define (set-notify-handler! conn h) +  (when (and h (not (procedure? h))) +    (make-pg-condition 'type 'set-notify-handler! "Not a procedure")) +  (pg-connection-notify-handler-set! conn h))  (define (connected? conn) (not (not (pg-connection-ptr conn))))  (define (pgsql-connection->fd conn)    ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn))) -(define (wait-for-connection! conn poll-function) +(define (pq-thread-wait-for-i/o! conn in/out #!optional delay)    (let ((conn-fd (pgsql-connection->fd conn)) -        (conn-ptr (pg-connection-ptr conn))) +        (t ##sys#current-thread)) +    (when delay (##sys#thread-block-for-timeout! +                 t (+ (current-milliseconds) delay))) +    (##sys#thread-block-for-i/o! t conn-fd in/out) +    (##sys#thread-yield!))) + +(define (wait-for-connection! conn poll-function) +  (let ((conn-ptr (pg-connection-ptr conn)))      (let loop ((result (poll-function conn-ptr)))        (cond ((= result PGRES_POLLING_OK) (void))              ((= result PGRES_POLLING_FAILED) @@ -448,7 +474,7 @@                  'connect 'connect                  (conc "Polling Postgres database failed: " message) conn)))              ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING)) -             (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result) +             (pq-thread-wait-for-i/o! conn (if (= PGRES_POLLING_READING result)                                                 #:input                                                 #:output))               (loop (poll-function conn-ptr))) @@ -497,7 +523,8 @@  (define (connect #!optional (conn-spec '())                   (type-parsers (default-type-parsers)) -                 (type-unparsers (default-type-unparsers))) +                 (type-unparsers (default-type-unparsers)) +                 (notify-handler (default-notify-handler)))    (let ((conn-ptr (connect-start conn-spec)))      (cond       ((not conn-ptr) @@ -511,8 +538,9 @@           'connect 'connect           (conc "Connection to Postgres database failed: " message) conn-spec)))       (else -      (let ((conn (make-pg-connection conn-ptr type-parsers -                                      (make-hash-table) type-unparsers 0))) +      (let ((conn (make-pg-connection +                   conn-ptr type-parsers (make-hash-table) +                   type-unparsers notify-handler 0)))          ;; We don't want libpq to piss in our stderr stream          ((foreign-lambda* void ((pgconn* conn))             "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr) @@ -705,19 +733,47 @@  ;;;; Query procedures  ;;;;;;;;;;;;;;;;;;;;;;;; +(define (wait-for-notifications! conn #!optional delay) +  (let ((conn-ptr (pg-connection-ptr conn))) +    (let loop ((seen-notifies #f) +               (waited #f)) +      (if (PQconsumeInput conn-ptr) +          (cond ((handle-notifies! conn) (loop #t waited)) +                ((and (not seen-notifies) (not waited)) +                 (pq-thread-wait-for-i/o! conn #:input delay) +                 (loop seen-notifies #t))) +          (postgresql-error 'i/o 'wait-for-notifications! +                            (conc "Error reading reply from server: " +                                  (PQerrorMessage conn-ptr)) +                            conn))))) +  ;; Buffer all available input, yielding if nothing is available:  (define (buffer-available-input! conn) -  (let ((conn-ptr (pg-connection-ptr conn)) -        (conn-fd (pgsql-connection->fd conn))) +  (let ((conn-ptr (pg-connection-ptr conn)))      (let loop ()        (if (PQconsumeInput conn-ptr) -          (when (PQisBusy conn-ptr) -            (thread-wait-for-i/o! conn-fd #:input) -            (loop)) -          (postgresql-error -           'i/o 'buffer-available-input! -           (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) -           conn))))) +          (cond ((handle-notifies! conn) (loop)) +                ((PQisBusy conn-ptr) +                 (pq-thread-wait-for-i/o! conn #:input) +                 (loop))) +          (postgresql-error 'i/o 'buffer-available-input! +                            (conc "Error reading reply from server: " +                                  (PQerrorMessage conn-ptr)) +                            conn))))) + +(define (handle-notifies! conn) +  (let ((conn-ptr (pg-connection-ptr conn)) +        (handler (pg-connection-notify-handler conn))) +    (let lp ((notify (PQnotifies conn-ptr)) +             (seen-notifications? #f)) +      (cond (notify +             (let ((channel (PGnotify-channel notify)) +                   (pid (PGnotify-be-pid notify)) +                   (payload (PGnotify-payload notify))) +               (free notify) +               (and handler (handler channel pid payload)) +               (lp (PQnotifies conn-ptr) #t))) +            (else seen-notifications?)))))  ;; Here be more dragons  (define (resolve-unknown-types! conn oids) @@ -1038,7 +1094,7 @@          (begin            (pg-connection-transaction-depth-set! conn old-depth)            (rollback!) -          (raise exn)) +          (signal exn))        (let ((res (thunk)))          (pg-connection-transaction-depth-set! conn old-depth)          (if res (commit!) (rollback!)) @@ -1059,12 +1115,11 @@                         'type 'put-copy-data                         "Expected a blob, string or srfi-4 vector" conn data))))           (len (##sys#size data)) -         (conn-ptr (pg-connection-ptr conn)) -         (conn-fd (pgsql-connection->fd conn))) +         (conn-ptr (pg-connection-ptr conn)))      (let loop ((res (PQputCopyData conn-ptr data len)))        (cond         ((= res 0) -        (thread-wait-for-i/o! conn-fd #:output) +        (pq-thread-wait-for-i/o! conn #:output)          (loop (PQputCopyData conn-ptr data len)))         ((= res 1) (void))         ((= res -1) @@ -1076,12 +1131,11 @@                (conc "Internal error! Unexpected return value: " res) conn))))))  (define (put-copy-end conn #!optional error-message) -  (let ((conn-ptr (pg-connection-ptr conn)) -        (conn-fd (pgsql-connection->fd conn))) +  (let ((conn-ptr (pg-connection-ptr conn)))      (let loop ((res (PQputCopyEnd conn-ptr error-message)))        (cond         ((= res 0) -        (thread-wait-for-i/o! conn-fd #:output) +        (pq-thread-wait-for-i/o! conn #:output)          (loop (PQputCopyEnd conn-ptr error-message)))         ((= res 1) (get-last-result conn))         ((= res -1) @@ -1279,7 +1333,7 @@    (let* ((result (query* conn query params format: format raw: raw))           (data-format (result-format result)))      (handle-exceptions exn -        (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) +        (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup)))        (let loop ((data (get-copy-data conn format: data-format))                   (seed knil))          (if (result? data) @@ -1298,7 +1352,7 @@    (let* ((result (query* conn query params format: format raw: raw))           (data-format (result-format result)))      (handle-exceptions exn -        (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) +        (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup)))        (let loop ((data (get-copy-data conn format: data-format)))          (if (result? data)              knil @@ -1336,9 +1390,9 @@                         (lambda () (put-copy-end conn) (set! closed? #t)))))      (handle-exceptions exn          (if closed? -            (raise exn) +            (signal exn)              (handle-exceptions _ -                (raise exn) +                (signal exn)                ;; Previously written data will be discarded to guarantee atomicity                (put-copy-end conn "Chicken PostgreSQL egg -- forcing error")))        (call-with-values (lambda () (proc output-port)) diff --git a/tests/run.scm b/tests/run.scm index 0469929..80cf799 100644 --- a/tests/run.scm +++ b/tests/run.scm @@ -4,7 +4,8 @@          test          postgresql          sql-null -        srfi-4) +        srfi-4 +        srfi-18)  (define-syntax test-error*    (syntax-rules () @@ -942,7 +943,80 @@                       (query conn "SELECT * FROM chicken_pgsql_test")))               isolation: 'serializable))        (disconnect conn2))) -  ) +) + + +;; This testing code is pretty hairy +(test-group "LISTEN/NOTIFY" +  (let ((received-channel #f) +        (received-pid #f) +        (received-message #f) +        (pid1 (value-at (query conn "SELECT pg_backend_pid()")))) + +    (define (reset-received-values!) +      (set! received-channel #f) +      (set! received-pid #f) +      (set! received-message #f)) + +    (query conn "LISTEN \"testing channel\"") +    (query conn "LISTEN \"unused channel\"") + +    (set-notify-handler! conn (lambda (channel pid message) +                                (set! received-channel channel) +                                (set! received-pid pid) +                                (set! received-message message))) + +    (query conn "NOTIFY \"testing channel\", 'this is a test'") + +    (test "Notification handler is immediately invoked for own connection" +          `("testing channel" ,pid1 "this is a test") +          (list received-channel pid1 received-message)) + +    (reset-received-values!) + +    (let* ((conn2 (connect '((dbname . test)))) +           (pid2 (value-at (query conn2 "SELECT pg_backend_pid()")))) +      (query conn2 "NOTIFY \"testing channel\", 'another test'") + +      (test "Notification handler for connection 1 is not invoked when notifying from connection 2" +            `(#f #f #f) +            (list received-channel received-pid received-message)) + +      (query conn "SELECT 1") +      (test "Notification handler is invoked after performing next query" +            `("testing channel" ,pid2 "another test") +            (list received-channel received-pid received-message)) + +      (reset-received-values!) + +      ;; This sucks, we have to do this from another thread +      (thread-start! +       (lambda () +         (thread-sleep! 0.1) +         (query conn2 "NOTIFY \"testing channel\", 'hi'"))) + +      (test "Waiting manually for a short while does nothing yet" +            `(#f #f #f) +            (begin (wait-for-notifications! conn 1) +                   (list received-channel received-pid received-message))) + +      (test "Waiting long enough returns the notification" +            `("testing channel" ,pid2 "hi") +            (begin (wait-for-notifications! conn 5000) +                   (list received-channel received-pid received-message))) + +      (reset-received-values!) + +      ;; And once more +      (thread-start! +       (lambda () +         (thread-sleep! 0.01) +         (query conn2 "NOTIFY \"testing channel\", 'also hi'"))) + +      (test "Waiting without timeout returns the notification" +            `("testing channel" ,pid2 "also hi") +            (begin (wait-for-notifications! conn #f) +                   (list received-channel received-pid received-message))))))  (test-end) | 
