From f67ab295d088f8cb1a3407d1b8c187a7187c7e95 Mon Sep 17 00:00:00 2001 From: Peter Bex Date: Sun, 12 May 2019 10:34:10 +0200 Subject: Add LISTEN/NOTIFY support This really is just a matter of reading out notifications when they're available. Also, we offer a way to explicitly wait for notifications. --- postgresql.scm | 114 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 30 deletions(-) (limited to 'postgresql.scm') diff --git a/postgresql.scm b/postgresql.scm index cb2ba6a..235856c 100644 --- a/postgresql.scm +++ b/postgresql.scm @@ -1,6 +1,6 @@ ;;; Bindings to the PostgreSQL C library ;; -;; Copyright (C) 2008-2014 Peter Bex +;; Copyright (C) 2008-2019 Peter Bex ;; Copyright (C) 2004 Johannes Grødem ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. @@ -26,6 +26,7 @@ default-type-unparsers bool-unparser vector-unparser list-unparser connect reset-connection disconnect connection? connected? + set-notify-handler! query query* with-transaction in-transaction? @@ -47,7 +48,8 @@ copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map call-with-output-copy-query call-with-output-copy-query* - with-output-to-copy-query with-output-to-copy-query*) + with-output-to-copy-query with-output-to-copy-query* + wait-for-notifications!) (import scheme (chicken base) @@ -59,10 +61,10 @@ (chicken format) (chicken gc) (chicken blob) + (chicken time) srfi-1 srfi-4 srfi-13 - srfi-18 srfi-69 sql-null) @@ -104,6 +106,15 @@ (define PQisBusy (foreign-lambda bool PQisBusy pgconn*)) (define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*)) +(define-foreign-type pgnotify* (c-pointer "PGnotify")) +(define PQnotifies (foreign-lambda pgnotify* PQnotifies pgconn*)) +(define PGnotify-channel + (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->relname);")) +(define PGnotify-be-pid + (foreign-lambda* int ((pgnotify* n)) "C_return(n->be_pid);")) +(define PGnotify-payload + (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->extra);")) + (define-foreign-type pgresult* (c-pointer "PGresult")) (define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*)) @@ -315,6 +326,8 @@ (else (consume-string (cdr chars) (cons (car chars) consumed)))))))))) +(define default-notify-handler (make-parameter #f)) + ;; Array parsers and composite parsers are automatically cached when such ;; a value is requested. (define default-type-parsers @@ -426,19 +439,32 @@ ;;;;;;;;;;;;;;;;;;;; (define-record pg-connection - ptr type-parsers oid-parsers type-unparsers transaction-depth) + ptr type-parsers oid-parsers type-unparsers notify-handler + transaction-depth) (define connection? pg-connection?) (define type-parsers pg-connection-type-parsers) (define type-unparsers pg-connection-type-unparsers) +(define notify-handler pg-connection-notify-handler) +(define (set-notify-handler! conn h) + (when (and h (not (procedure? h))) + (make-pg-condition 'type 'set-notify-handler! "Not a procedure")) + (pg-connection-notify-handler-set! conn h)) (define (connected? conn) (not (not (pg-connection-ptr conn)))) (define (pgsql-connection->fd conn) ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn))) -(define (wait-for-connection! conn poll-function) +(define (pq-thread-wait-for-i/o! conn in/out #!optional delay) (let ((conn-fd (pgsql-connection->fd conn)) - (conn-ptr (pg-connection-ptr conn))) + (t ##sys#current-thread)) + (when delay (##sys#thread-block-for-timeout! + t (+ (current-milliseconds) delay))) + (##sys#thread-block-for-i/o! t conn-fd in/out) + (##sys#thread-yield!))) + +(define (wait-for-connection! conn poll-function) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((result (poll-function conn-ptr))) (cond ((= result PGRES_POLLING_OK) (void)) ((= result PGRES_POLLING_FAILED) @@ -448,7 +474,7 @@ 'connect 'connect (conc "Polling Postgres database failed: " message) conn))) ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING)) - (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result) + (pq-thread-wait-for-i/o! conn (if (= PGRES_POLLING_READING result) #:input #:output)) (loop (poll-function conn-ptr))) @@ -497,7 +523,8 @@ (define (connect #!optional (conn-spec '()) (type-parsers (default-type-parsers)) - (type-unparsers (default-type-unparsers))) + (type-unparsers (default-type-unparsers)) + (notify-handler (default-notify-handler))) (let ((conn-ptr (connect-start conn-spec))) (cond ((not conn-ptr) @@ -511,8 +538,9 @@ 'connect 'connect (conc "Connection to Postgres database failed: " message) conn-spec))) (else - (let ((conn (make-pg-connection conn-ptr type-parsers - (make-hash-table) type-unparsers 0))) + (let ((conn (make-pg-connection + conn-ptr type-parsers (make-hash-table) + type-unparsers notify-handler 0))) ;; We don't want libpq to piss in our stderr stream ((foreign-lambda* void ((pgconn* conn)) "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr) @@ -705,19 +733,47 @@ ;;;; Query procedures ;;;;;;;;;;;;;;;;;;;;;;;; +(define (wait-for-notifications! conn #!optional delay) + (let ((conn-ptr (pg-connection-ptr conn))) + (let loop ((seen-notifies #f) + (waited #f)) + (if (PQconsumeInput conn-ptr) + (cond ((handle-notifies! conn) (loop #t waited)) + ((and (not seen-notifies) (not waited)) + (pq-thread-wait-for-i/o! conn #:input delay) + (loop seen-notifies #t))) + (postgresql-error 'i/o 'wait-for-notifications! + (conc "Error reading reply from server: " + (PQerrorMessage conn-ptr)) + conn))))) + ;; Buffer all available input, yielding if nothing is available: (define (buffer-available-input! conn) - (let ((conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop () (if (PQconsumeInput conn-ptr) - (when (PQisBusy conn-ptr) - (thread-wait-for-i/o! conn-fd #:input) - (loop)) - (postgresql-error - 'i/o 'buffer-available-input! - (conc "Error reading reply from server: " (PQerrorMessage conn-ptr)) - conn))))) + (cond ((handle-notifies! conn) (loop)) + ((PQisBusy conn-ptr) + (pq-thread-wait-for-i/o! conn #:input) + (loop))) + (postgresql-error 'i/o 'buffer-available-input! + (conc "Error reading reply from server: " + (PQerrorMessage conn-ptr)) + conn))))) + +(define (handle-notifies! conn) + (let ((conn-ptr (pg-connection-ptr conn)) + (handler (pg-connection-notify-handler conn))) + (let lp ((notify (PQnotifies conn-ptr)) + (seen-notifications? #f)) + (cond (notify + (let ((channel (PGnotify-channel notify)) + (pid (PGnotify-be-pid notify)) + (payload (PGnotify-payload notify))) + (free notify) + (and handler (handler channel pid payload)) + (lp (PQnotifies conn-ptr) #t))) + (else seen-notifications?))))) ;; Here be more dragons (define (resolve-unknown-types! conn oids) @@ -1038,7 +1094,7 @@ (begin (pg-connection-transaction-depth-set! conn old-depth) (rollback!) - (raise exn)) + (signal exn)) (let ((res (thunk))) (pg-connection-transaction-depth-set! conn old-depth) (if res (commit!) (rollback!)) @@ -1059,12 +1115,11 @@ 'type 'put-copy-data "Expected a blob, string or srfi-4 vector" conn data)))) (len (##sys#size data)) - (conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyData conn-ptr data len))) (cond ((= res 0) - (thread-wait-for-i/o! conn-fd #:output) + (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyData conn-ptr data len))) ((= res 1) (void)) ((= res -1) @@ -1076,12 +1131,11 @@ (conc "Internal error! Unexpected return value: " res) conn)))))) (define (put-copy-end conn #!optional error-message) - (let ((conn-ptr (pg-connection-ptr conn)) - (conn-fd (pgsql-connection->fd conn))) + (let ((conn-ptr (pg-connection-ptr conn))) (let loop ((res (PQputCopyEnd conn-ptr error-message))) (cond ((= res 0) - (thread-wait-for-i/o! conn-fd #:output) + (pq-thread-wait-for-i/o! conn #:output) (loop (PQputCopyEnd conn-ptr error-message))) ((= res 1) (get-last-result conn)) ((= res -1) @@ -1279,7 +1333,7 @@ (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn - (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format)) (seed knil)) (if (result? data) @@ -1298,7 +1352,7 @@ (let* ((result (query* conn query params format: format raw: raw)) (data-format (result-format result))) (handle-exceptions exn - (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup))) + (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup))) (let loop ((data (get-copy-data conn format: data-format))) (if (result? data) knil @@ -1336,9 +1390,9 @@ (lambda () (put-copy-end conn) (set! closed? #t))))) (handle-exceptions exn (if closed? - (raise exn) + (signal exn) (handle-exceptions _ - (raise exn) + (signal exn) ;; Previously written data will be discarded to guarantee atomicity (put-copy-end conn "Chicken PostgreSQL egg -- forcing error"))) (call-with-values (lambda () (proc output-port)) -- cgit v1.2.3