summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Bex <peter@more-magic.net>2019-05-12 10:34:10 +0200
committerPeter Bex <peter@more-magic.net>2019-05-12 10:34:10 +0200
commitf67ab295d088f8cb1a3407d1b8c187a7187c7e95 (patch)
tree496789828afac0a9ac0d74aa3a59ab1f69eea8f8
parent13eaf04669aabaa0aa88eed768f83dabc8076f0a (diff)
downloadchicken-postgresql-4.1.0.tar.gz
Add LISTEN/NOTIFY support4.1.0
This really is just a matter of reading out notifications when they're available. Also, we offer a way to explicitly wait for notifications.
-rw-r--r--TODO2
-rw-r--r--postgresql.egg6
-rw-r--r--postgresql.scm114
-rw-r--r--tests/run.scm78
4 files changed, 164 insertions, 36 deletions
diff --git a/TODO b/TODO
index e6a3b16..7d0ab9d 100644
--- a/TODO
+++ b/TODO
@@ -1,6 +1,6 @@
- Add Batch (pipeline) API support (http://blog.2ndquadrant.com/postgresql-latency-pipelining-batching/)
-- Add LISTEN/NOTIFY support
- Add support for range types
+- Use PQsetnonblocking on the connection to ensure large sends are nonblocking too
- Add large object support
- Add namespace support for types?
- Add support for multiple hosts as list in connection args? (new in PG 10)
diff --git a/postgresql.egg b/postgresql.egg
index 3848b3e..2155a0a 100644
--- a/postgresql.egg
+++ b/postgresql.egg
@@ -2,12 +2,12 @@
((synopsis "Bindings for PostgreSQL's C-api")
(category db)
- (version 4.0.0)
+ (version 4.1.0)
(author "Johannes Groedem")
(maintainer "Peter Bex")
(license "BSD")
- (dependencies sql-null srfi-1 srfi-18 srfi-13 srfi-69)
- (test-dependencies test)
+ (dependencies sql-null srfi-1 srfi-13 srfi-69)
+ (test-dependencies test srfi-18)
(foreign-dependencies libpq) ;; Or libpq-dev? Highly OS-dependent!
(components (extension postgresql
(custom-build "build-postgresql")
diff --git a/postgresql.scm b/postgresql.scm
index cb2ba6a..235856c 100644
--- a/postgresql.scm
+++ b/postgresql.scm
@@ -1,6 +1,6 @@
;;; Bindings to the PostgreSQL C library
;;
-;; Copyright (C) 2008-2014 Peter Bex
+;; Copyright (C) 2008-2019 Peter Bex
;; Copyright (C) 2004 Johannes Grødem <johs@copyleft.no>
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
@@ -26,6 +26,7 @@
default-type-unparsers bool-unparser vector-unparser list-unparser
connect reset-connection disconnect connection? connected?
+ set-notify-handler!
query query* with-transaction in-transaction?
@@ -47,7 +48,8 @@
copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right
copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map
call-with-output-copy-query call-with-output-copy-query*
- with-output-to-copy-query with-output-to-copy-query*)
+ with-output-to-copy-query with-output-to-copy-query*
+ wait-for-notifications!)
(import scheme
(chicken base)
@@ -59,10 +61,10 @@
(chicken format)
(chicken gc)
(chicken blob)
+ (chicken time)
srfi-1
srfi-4
srfi-13
- srfi-18
srfi-69
sql-null)
@@ -104,6 +106,15 @@
(define PQisBusy (foreign-lambda bool PQisBusy pgconn*))
(define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*))
+(define-foreign-type pgnotify* (c-pointer "PGnotify"))
+(define PQnotifies (foreign-lambda pgnotify* PQnotifies pgconn*))
+(define PGnotify-channel
+ (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->relname);"))
+(define PGnotify-be-pid
+ (foreign-lambda* int ((pgnotify* n)) "C_return(n->be_pid);"))
+(define PGnotify-payload
+ (foreign-lambda* c-string ((pgnotify* n)) "C_return(n->extra);"))
+
(define-foreign-type pgresult* (c-pointer "PGresult"))
(define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*))
@@ -315,6 +326,8 @@
(else (consume-string (cdr chars)
(cons (car chars) consumed))))))))))
+(define default-notify-handler (make-parameter #f))
+
;; Array parsers and composite parsers are automatically cached when such
;; a value is requested.
(define default-type-parsers
@@ -426,19 +439,32 @@
;;;;;;;;;;;;;;;;;;;;
(define-record pg-connection
- ptr type-parsers oid-parsers type-unparsers transaction-depth)
+ ptr type-parsers oid-parsers type-unparsers notify-handler
+ transaction-depth)
(define connection? pg-connection?)
(define type-parsers pg-connection-type-parsers)
(define type-unparsers pg-connection-type-unparsers)
+(define notify-handler pg-connection-notify-handler)
+(define (set-notify-handler! conn h)
+ (when (and h (not (procedure? h)))
+ (make-pg-condition 'type 'set-notify-handler! "Not a procedure"))
+ (pg-connection-notify-handler-set! conn h))
(define (connected? conn) (not (not (pg-connection-ptr conn))))
(define (pgsql-connection->fd conn)
((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn)))
-(define (wait-for-connection! conn poll-function)
+(define (pq-thread-wait-for-i/o! conn in/out #!optional delay)
(let ((conn-fd (pgsql-connection->fd conn))
- (conn-ptr (pg-connection-ptr conn)))
+ (t ##sys#current-thread))
+ (when delay (##sys#thread-block-for-timeout!
+ t (+ (current-milliseconds) delay)))
+ (##sys#thread-block-for-i/o! t conn-fd in/out)
+ (##sys#thread-yield!)))
+
+(define (wait-for-connection! conn poll-function)
+ (let ((conn-ptr (pg-connection-ptr conn)))
(let loop ((result (poll-function conn-ptr)))
(cond ((= result PGRES_POLLING_OK) (void))
((= result PGRES_POLLING_FAILED)
@@ -448,7 +474,7 @@
'connect 'connect
(conc "Polling Postgres database failed: " message) conn)))
((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING))
- (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result)
+ (pq-thread-wait-for-i/o! conn (if (= PGRES_POLLING_READING result)
#:input
#:output))
(loop (poll-function conn-ptr)))
@@ -497,7 +523,8 @@
(define (connect #!optional (conn-spec '())
(type-parsers (default-type-parsers))
- (type-unparsers (default-type-unparsers)))
+ (type-unparsers (default-type-unparsers))
+ (notify-handler (default-notify-handler)))
(let ((conn-ptr (connect-start conn-spec)))
(cond
((not conn-ptr)
@@ -511,8 +538,9 @@
'connect 'connect
(conc "Connection to Postgres database failed: " message) conn-spec)))
(else
- (let ((conn (make-pg-connection conn-ptr type-parsers
- (make-hash-table) type-unparsers 0)))
+ (let ((conn (make-pg-connection
+ conn-ptr type-parsers (make-hash-table)
+ type-unparsers notify-handler 0)))
;; We don't want libpq to piss in our stderr stream
((foreign-lambda* void ((pgconn* conn))
"PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr)
@@ -705,19 +733,47 @@
;;;; Query procedures
;;;;;;;;;;;;;;;;;;;;;;;;
+(define (wait-for-notifications! conn #!optional delay)
+ (let ((conn-ptr (pg-connection-ptr conn)))
+ (let loop ((seen-notifies #f)
+ (waited #f))
+ (if (PQconsumeInput conn-ptr)
+ (cond ((handle-notifies! conn) (loop #t waited))
+ ((and (not seen-notifies) (not waited))
+ (pq-thread-wait-for-i/o! conn #:input delay)
+ (loop seen-notifies #t)))
+ (postgresql-error 'i/o 'wait-for-notifications!
+ (conc "Error reading reply from server: "
+ (PQerrorMessage conn-ptr))
+ conn)))))
+
;; Buffer all available input, yielding if nothing is available:
(define (buffer-available-input! conn)
- (let ((conn-ptr (pg-connection-ptr conn))
- (conn-fd (pgsql-connection->fd conn)))
+ (let ((conn-ptr (pg-connection-ptr conn)))
(let loop ()
(if (PQconsumeInput conn-ptr)
- (when (PQisBusy conn-ptr)
- (thread-wait-for-i/o! conn-fd #:input)
- (loop))
- (postgresql-error
- 'i/o 'buffer-available-input!
- (conc "Error reading reply from server: " (PQerrorMessage conn-ptr))
- conn)))))
+ (cond ((handle-notifies! conn) (loop))
+ ((PQisBusy conn-ptr)
+ (pq-thread-wait-for-i/o! conn #:input)
+ (loop)))
+ (postgresql-error 'i/o 'buffer-available-input!
+ (conc "Error reading reply from server: "
+ (PQerrorMessage conn-ptr))
+ conn)))))
+
+(define (handle-notifies! conn)
+ (let ((conn-ptr (pg-connection-ptr conn))
+ (handler (pg-connection-notify-handler conn)))
+ (let lp ((notify (PQnotifies conn-ptr))
+ (seen-notifications? #f))
+ (cond (notify
+ (let ((channel (PGnotify-channel notify))
+ (pid (PGnotify-be-pid notify))
+ (payload (PGnotify-payload notify)))
+ (free notify)
+ (and handler (handler channel pid payload))
+ (lp (PQnotifies conn-ptr) #t)))
+ (else seen-notifications?)))))
;; Here be more dragons
(define (resolve-unknown-types! conn oids)
@@ -1038,7 +1094,7 @@
(begin
(pg-connection-transaction-depth-set! conn old-depth)
(rollback!)
- (raise exn))
+ (signal exn))
(let ((res (thunk)))
(pg-connection-transaction-depth-set! conn old-depth)
(if res (commit!) (rollback!))
@@ -1059,12 +1115,11 @@
'type 'put-copy-data
"Expected a blob, string or srfi-4 vector" conn data))))
(len (##sys#size data))
- (conn-ptr (pg-connection-ptr conn))
- (conn-fd (pgsql-connection->fd conn)))
+ (conn-ptr (pg-connection-ptr conn)))
(let loop ((res (PQputCopyData conn-ptr data len)))
(cond
((= res 0)
- (thread-wait-for-i/o! conn-fd #:output)
+ (pq-thread-wait-for-i/o! conn #:output)
(loop (PQputCopyData conn-ptr data len)))
((= res 1) (void))
((= res -1)
@@ -1076,12 +1131,11 @@
(conc "Internal error! Unexpected return value: " res) conn))))))
(define (put-copy-end conn #!optional error-message)
- (let ((conn-ptr (pg-connection-ptr conn))
- (conn-fd (pgsql-connection->fd conn)))
+ (let ((conn-ptr (pg-connection-ptr conn)))
(let loop ((res (PQputCopyEnd conn-ptr error-message)))
(cond
((= res 0)
- (thread-wait-for-i/o! conn-fd #:output)
+ (pq-thread-wait-for-i/o! conn #:output)
(loop (PQputCopyEnd conn-ptr error-message)))
((= res 1) (get-last-result conn))
((= res -1)
@@ -1279,7 +1333,7 @@
(let* ((result (query* conn query params format: format raw: raw))
(data-format (result-format result)))
(handle-exceptions exn
- (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup)))
+ (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup)))
(let loop ((data (get-copy-data conn format: data-format))
(seed knil))
(if (result? data)
@@ -1298,7 +1352,7 @@
(let* ((result (query* conn query params format: format raw: raw))
(data-format (result-format result)))
(handle-exceptions exn
- (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup)))
+ (let cleanup () (if (result? (get-copy-data conn)) (signal exn) (cleanup)))
(let loop ((data (get-copy-data conn format: data-format)))
(if (result? data)
knil
@@ -1336,9 +1390,9 @@
(lambda () (put-copy-end conn) (set! closed? #t)))))
(handle-exceptions exn
(if closed?
- (raise exn)
+ (signal exn)
(handle-exceptions _
- (raise exn)
+ (signal exn)
;; Previously written data will be discarded to guarantee atomicity
(put-copy-end conn "Chicken PostgreSQL egg -- forcing error")))
(call-with-values (lambda () (proc output-port))
diff --git a/tests/run.scm b/tests/run.scm
index 0469929..80cf799 100644
--- a/tests/run.scm
+++ b/tests/run.scm
@@ -4,7 +4,8 @@
test
postgresql
sql-null
- srfi-4)
+ srfi-4
+ srfi-18)
(define-syntax test-error*
(syntax-rules ()
@@ -942,7 +943,80 @@
(query conn "SELECT * FROM chicken_pgsql_test")))
isolation: 'serializable))
(disconnect conn2)))
- )
+)
+
+
+;; This testing code is pretty hairy
+(test-group "LISTEN/NOTIFY"
+ (let ((received-channel #f)
+ (received-pid #f)
+ (received-message #f)
+ (pid1 (value-at (query conn "SELECT pg_backend_pid()"))))
+
+ (define (reset-received-values!)
+ (set! received-channel #f)
+ (set! received-pid #f)
+ (set! received-message #f))
+
+ (query conn "LISTEN \"testing channel\"")
+ (query conn "LISTEN \"unused channel\"")
+
+ (set-notify-handler! conn (lambda (channel pid message)
+ (set! received-channel channel)
+ (set! received-pid pid)
+ (set! received-message message)))
+
+ (query conn "NOTIFY \"testing channel\", 'this is a test'")
+
+ (test "Notification handler is immediately invoked for own connection"
+ `("testing channel" ,pid1 "this is a test")
+ (list received-channel pid1 received-message))
+
+ (reset-received-values!)
+
+ (let* ((conn2 (connect '((dbname . test))))
+ (pid2 (value-at (query conn2 "SELECT pg_backend_pid()"))))
+ (query conn2 "NOTIFY \"testing channel\", 'another test'")
+
+ (test "Notification handler for connection 1 is not invoked when notifying from connection 2"
+ `(#f #f #f)
+ (list received-channel received-pid received-message))
+
+ (query conn "SELECT 1")
+ (test "Notification handler is invoked after performing next query"
+ `("testing channel" ,pid2 "another test")
+ (list received-channel received-pid received-message))
+
+ (reset-received-values!)
+
+ ;; This sucks, we have to do this from another thread
+ (thread-start!
+ (lambda ()
+ (thread-sleep! 0.1)
+ (query conn2 "NOTIFY \"testing channel\", 'hi'")))
+
+ (test "Waiting manually for a short while does nothing yet"
+ `(#f #f #f)
+ (begin (wait-for-notifications! conn 1)
+ (list received-channel received-pid received-message)))
+
+ (test "Waiting long enough returns the notification"
+ `("testing channel" ,pid2 "hi")
+ (begin (wait-for-notifications! conn 5000)
+ (list received-channel received-pid received-message)))
+
+ (reset-received-values!)
+
+ ;; And once more
+ (thread-start!
+ (lambda ()
+ (thread-sleep! 0.01)
+ (query conn2 "NOTIFY \"testing channel\", 'also hi'")))
+
+ (test "Waiting without timeout returns the notification"
+ `("testing channel" ,pid2 "also hi")
+ (begin (wait-for-notifications! conn #f)
+ (list received-channel received-pid received-message))))))
(test-end)