From f67ab295d088f8cb1a3407d1b8c187a7187c7e95 Mon Sep 17 00:00:00 2001 From: Peter Bex Date: Sun, 12 May 2019 10:34:10 +0200 Subject: Add LISTEN/NOTIFY support This really is just a matter of reading out notifications when they're available. Also, we offer a way to explicitly wait for notifications. --- TODO | 2 +- postgresql.egg | 6 +-- postgresql.scm | 114 ++++++++++++++++++++++++++++++++++++++++++--------------- tests/run.scm | 78 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 164 insertions(+), 36 deletions(-) diff --git a/TODO b/TODO index e6a3b16..7d0ab9d 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,6 @@ - Add Batch (pipeline) API support (http://blog.2ndquadrant.com/postgresql-latency-pipelining-batching/) -- Add LISTEN/NOTIFY support - Add support for range types +- Use PQsetnonblocking on the connection to ensure large sends are nonblocking too - Add large object support - Add namespace support for types? - Add support for multiple hosts as list in connection args? (new in PG 10) diff --git a/postgresql.egg b/postgresql.egg index 3848b3e..2155a0a 100644 --- a/postgresql.egg +++ b/postgresql.egg @@ -2,12 +2,12 @@ ((synopsis "Bindings for PostgreSQL's C-api") (category db) - (version 4.0.0) + (version 4.1.0) (author "Johannes Groedem") (maintainer "Peter Bex") (license "BSD") - (dependencies sql-null srfi-1 srfi-18 srfi-13 srfi-69) - (test-dependencies test) + (dependencies sql-null srfi-1 srfi-13 srfi-69) + (test-dependencies test srfi-18) (foreign-dependencies libpq) ;; Or libpq-dev? Highly OS-dependent! (components (extension postgresql (custom-build "build-postgresql") diff --git a/postgresql.scm b/postgresql.scm index cb2ba6a..235856c 100644 --- a/postgresql.scm +++ b/postgresql.scm @@ -1,6 +1,6 @@ ;;; Bindings to the PostgreSQL C library ;; -;; Copyright (C) 2008-2014 Peter Bex +;; Copyright (C) 2008-2019 Peter Bex ;; Copyright (C) 2004 Johannes Grødem ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. @@ -26,6 +26,7 @@ default-type-unparsers bool-unparser vector-unparser list-unparser connect reset-connection disconnect connection? connected? + set-notify-handler! query query* with-transaction in-transaction? @@ -47,7 +48,8 @@ copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map call-with-output-copy-query call-with-output-copy-query* - with-output-to-copy-query with-output-to-copy-query*) + with-output-to-copy-query with-output-to-copy-query* + wait-for-notifications!) (import scheme (chicken base) @@ -59,10 +61,10 @@ (chicken format) (chicken gc) (chicken blob) + (chicken time) srfi-1 srfi-4 srfi-13 - srfi-18 srfi-69 sql-null) @@ -104,6 +106,15 @@ (define PQisBusy (foreign-lambda bool PQisBusy pgconn*)) (define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*)) +(define-foreign-type pgnotify* (c-pointer "PGnotify")) +(define PQnotifies (foreign-lambda pgnotify* PQnotifies pgconn*)) +(define PGnotify-channel + (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->relname);")) +(define PGnotify-be-pid + (foreign-lambda* int ((pgnotify* n)) "C_return(n->be_pid);")) +(define PGnotify-payload + (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->extra);")) + (define-foreign-type pgresult* (c-pointer "PGresult")) (define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*)) @@ -315,6 +326,8 @@ (else (consume-string (cdr chars) (cons (car chars) consumed)))))))))) +(define default-notify-handler (make-parameter #f)) + ;; Array parsers and composite parsers are automatically cached when such ;; a value is requested. (define default-type-parsers @@ -426,19 +439,32 @@ ;;;;;;;;;;;;;;;;;;;; (define-record pg-connection - ptr type-parsers oid-parsers type-unparsers transaction-depth) + ptr type-parsers oid-parsers type-unparsers notify-handler + transaction-depth) (define connection? pg-connection?) (define type-parsers pg-connection-type-parsers) (define type-unparsers pg-connection-type-unparsers) +(define notify-handler pg-connection-notify-handler) +(define (set-notify-handler! conn h) + (when (and h (not (procedure? h))) + (make-pg-condition 'type 'set-notify-handler! "Not a procedure")) + (pg-connection-notify-handler-set! conn h)) (define (connected? conn) (not (not (pg-connection-ptr conn)))) (define (pgsql-connection->fd conn) ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn))) -(define (wait-for-connection! conn poll-function) +(define (pq-thread-wait-for-i/o! conn in/out #!optional delay) (let ((conn-fd (pgsql-connection->fd conn)) - (conn-ptr (pg-connection-ptr conn))) + (t ##sys#current-thread)) + (when delay (##sys#thread-block-for-timeout! + t (+ (current-milliseconds) delay))) + (##sys#thread-block-for-i/o! t conn-fd in/out) + (##sys#thread-yield!))) + +(define (wait-for-connection! conn poll-function) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((result (poll-function conn-ptr))) (cond ((= result PGRES_POLLING_OK) (void)) ((= result PGRES_POLLING_FAILED) @@ -448,7 +474,7 @@ 'connect 'connect (conc "Polling Postgres database failed: " message) conn))) ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING)) - (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result) + (pq-thread-wait-for-i/o! conn (if (= PGRES_POLLING_READING result) #:input #:output)) (loop (poll-function conn-ptr))) @@ -497,7 +523,8 @@ (define (connect #!optional (conn-spec '()) (type-parsers (default-type-parsers)) - (type-unparsers (default-type-unparsers))) + (type-unparsers (default-type-unparsers)) + (notify-handler (default-notify-handler))) (let ((conn-ptr (connect-start conn-spec))) (cond ((not conn-ptr) @@ -511,8 +538,9 @@ 'connect 'connect (conc "Connection to Postgres database failed: " message) conn-spec))) (else - (let ((conn (make-pg-connection conn-ptr type-parsers - (make-hash-table) type-unparsers 0))) + (let ((conn (make-pg-connection + conn-ptr type-parsers (make-hash-table) + type-unparsers notify-handler 0))) ;; We don't want libpq to piss in our stderr stream ((foreign-lambda* void ((pgconn* conn)) "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr) @@ -705,19 +733,47 @@ ;;;; Query procedures ;;;;;;;;;;;;;;;;;;;;;;;; +(define (wait-for-notifications! conn #!optional delay) + (let ((conn-ptr (pg-connection-ptr conn))) + (let loop ((seen-notifies #f) + (waited #f)) + (if (PQconsumeInput conn-ptr) + (cond ((handle-notifies! conn) (loop #t waited)) + ((and (not seen-notifies) (not waited)) + (pq-thread-wait-for-i/o! conn #:input delay) + (loop seen-notifies #t))) + (postgresql-error 'i/o 'wait-for-notifications! + (conc "Error reading reply from server: " + (PQerrorMessage conn-ptr)) + conn))))) + ;; Buffer all available input, yielding if nothing is available: (define (buffer-available-input! conn) - (let ((conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop () (if (PQconsumeInput conn-ptr) - (when (PQisBusy conn-ptr) - (thread-wait-for-i/o! conn-fd #:input) - (loop)) - (postgresql-error - 'i/o 'buffer-available-input! - (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) - conn))))) + (cond ((handle-notifies! conn) (loop)) + ((PQisBusy conn-ptr) + (pq-thread-wait-for-i/o! conn #:input) + (loop))) + (postgresql-error 'i/o 'buffer-available-input! + (conc "Error reading reply from server: " + (PQerrorMessage conn-ptr)) + conn))))) + +(define (handle-notifies! conn) + (let ((conn-ptr (pg-connection-ptr conn)) + (handler (pg-connection-notify-handler conn))) + (let lp ((notify (PQnotifies conn-ptr)) + (seen-notifications? #f)) + (cond (notify + (let ((channel (PGnotify-channel notify)) + (pid (PGnotify-be-pid notify)) + (payload (PGnotify-payload notify))) + (free notify) + (and handler (handler channel pid payload)) + (lp (PQnotifies conn-ptr) #t))) + (else seen-notifications?))))) ;; Here be more dragons (define (resolve-unknown-types! conn oids) @@ -1038,7 +1094,7 @@ (begin (pg-connection-transaction-depth-set! conn old-depth) (rollback!) - (raise exn)) + (signal exn)) (let ((res (thunk))) (pg-connection-transaction-depth-set! conn old-depth) (if res (commit!) (rollback!)) @@ -1059,12 +1115,11 @@ 'type 'put-copy-data "Expected a blob, string or srfi-4 vector" conn data)))) (len (##sys#size data)) - (conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyData conn-ptr data len))) (cond ((= res 0) - (thread-wait-for-i/o! conn-fd #:output) + (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyData conn-ptr data len))) ((= res 1) (void)) ((= res -1) @@ -1076,12 +1131,11 @@ (conc "Internal error! Unexpected return value: " res) conn)))))) (define (put-copy-end conn #!optional error-message) - (let ((conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyEnd conn-ptr error-message))) (cond ((= res 0) - (thread-wait-for-i/o! conn-fd #:output) + (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyEnd conn-ptr error-message))) ((= res 1) (get-last-result conn)) ((= res -1) @@ -1279,7 +1333,7 @@ (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn - (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format)) (seed knil)) (if (result? data) @@ -1298,7 +1352,7 @@ (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn - (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format))) (if (result? data) knil @@ -1336,9 +1390,9 @@ (lambda () (put-copy-end conn) (set! closed? #t))))) (handle-exceptions exn (if closed? - (raise exn) + (signal exn) (handle-exceptions _ - (raise exn) + (signal exn) ;; Previously written data will be discarded to guarantee atomicity (put-copy-end conn "Chicken PostgreSQL egg -- forcing error"))) (call-with-values (lambda () (proc output-port)) diff --git a/tests/run.scm b/tests/run.scm index 0469929..80cf799 100644 --- a/tests/run.scm +++ b/tests/run.scm @@ -4,7 +4,8 @@ test postgresql sql-null - srfi-4) + srfi-4 + srfi-18) (define-syntax test-error* (syntax-rules () @@ -942,7 +943,80 @@ (query conn "SELECT * FROM chicken_pgsql_test"))) isolation: 'serializable)) (disconnect conn2))) - ) +) + + +;; This testing code is pretty hairy +(test-group "LISTEN/NOTIFY" + (let ((received-channel #f) + (received-pid #f) + (received-message #f) + (pid1 (value-at (query conn "SELECT pg_backend_pid()")))) + + (define (reset-received-values!) + (set! received-channel #f) + (set! received-pid #f) + (set! received-message #f)) + + (query conn "LISTEN \"testing channel\"") + (query conn "LISTEN \"unused channel\"") + + (set-notify-handler! conn (lambda (channel pid message) + (set! received-channel channel) + (set! received-pid pid) + (set! received-message message))) + + (query conn "NOTIFY \"testing channel\", 'this is a test'") + + (test "Notification handler is immediately invoked for own connection" + `("testing channel" ,pid1 "this is a test") + (list received-channel pid1 received-message)) + + (reset-received-values!) + + (let* ((conn2 (connect '((dbname . test)))) + (pid2 (value-at (query conn2 "SELECT pg_backend_pid()")))) + (query conn2 "NOTIFY \"testing channel\", 'another test'") + + (test "Notification handler for connection 1 is not invoked when notifying from connection 2" + `(#f #f #f) + (list received-channel received-pid received-message)) + + (query conn "SELECT 1") + (test "Notification handler is invoked after performing next query" + `("testing channel" ,pid2 "another test") + (list received-channel received-pid received-message)) + + (reset-received-values!) + + ;; This sucks, we have to do this from another thread + (thread-start! + (lambda () + (thread-sleep! 0.1) + (query conn2 "NOTIFY \"testing channel\", 'hi'"))) + + (test "Waiting manually for a short while does nothing yet" + `(#f #f #f) + (begin (wait-for-notifications! conn 1) + (list received-channel received-pid received-message))) + + (test "Waiting long enough returns the notification" + `("testing channel" ,pid2 "hi") + (begin (wait-for-notifications! conn 5000) + (list received-channel received-pid received-message))) + + (reset-received-values!) + + ;; And once more + (thread-start! + (lambda () + (thread-sleep! 0.01) + (query conn2 "NOTIFY \"testing channel\", 'also hi'"))) + + (test "Waiting without timeout returns the notification" + `("testing channel" ,pid2 "also hi") + (begin (wait-for-notifications! conn #f) + (list received-channel received-pid received-message)))))) (test-end) -- cgit v1.2.3