You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
235 lines
6.1 KiB
235 lines
6.1 KiB
package pgxevent |
|
|
|
import ( |
|
"context" |
|
"strings" |
|
"text/template" |
|
|
|
pgx "github.com/jackc/pgx/v4" |
|
pgxpool "github.com/jackc/pgx/v4/pgxpool" |
|
) |
|
|
|
// Listener binds a condition on a data table to a handler: when the |
|
// condition is (or becomes) true for a row, the row ID is passed to |
|
// Handler.Handle |
|
type Listener struct { |
|
Table string // Table name, may be double-quoted |
|
PrimaryKeyColumn string // Primary key column |
|
ConditionTemplate *template.Template // WHERE condition with {{.}} substituted for table alias |
|
Handler EventHandler // Interface |
|
} |
|
|
|
type EventConfig struct { |
|
DatabaseURL string |
|
ServiceName string |
|
ServiceVersion string |
|
Listeners map[string]Listener |
|
} |
|
|
|
type request struct { |
|
Channel string |
|
Payload string |
|
} |
|
|
|
type completion struct { |
|
Channel string |
|
Payload string |
|
Error error |
|
} |
|
|
|
func Listen(ctx context.Context, cfg *EventConfig) error { |
|
ctx, cancel := context.WithCancel(ctx) |
|
_ = cancel |
|
pool, err := pgxpool.Connect(ctx, cfg.DatabaseURL) |
|
if err != nil { |
|
return err |
|
} |
|
defer pool.Close() |
|
conn, err := pool.Acquire(ctx) |
|
defer conn.Release() |
|
|
|
err = conn.BeginFunc(ctx, func(tx pgx.Tx) error { |
|
_, err = tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS pgxevent |
|
(service TEXT PRIMARY KEY, version TEXT)`) |
|
if err != nil { |
|
return err |
|
} |
|
row := tx.QueryRow(ctx, `SELECT version FROM pgxevent |
|
WHERE service = $1`, cfg.ServiceName) |
|
needsUpdate := false |
|
ver := "" |
|
err := row.Scan(&ver) |
|
if err == pgx.ErrNoRows { |
|
_, err := tx.Exec(ctx, `INSERT INTO |
|
pgxevent(service, version) VALUES ($1,$2)`, |
|
cfg.ServiceName, cfg.ServiceVersion) |
|
if err != nil { |
|
return err |
|
} |
|
needsUpdate = true |
|
} else if err == nil { |
|
if err != nil { |
|
return err |
|
} |
|
needsUpdate = (ver != cfg.ServiceVersion) |
|
_, err := tx.Exec(ctx, `UPDATE pgxevent |
|
SET version=$1 WHERE service=$2`, |
|
cfg.ServiceVersion, cfg.ServiceName) |
|
if err != nil { |
|
return err |
|
} |
|
} else { |
|
return err |
|
} |
|
if needsUpdate { |
|
for key, value := range cfg.Listeners { |
|
triggerName := "pgxevent:" + cfg.ServiceName + ":" + key |
|
condBuffer := strings.Builder{} |
|
err = value.ConditionTemplate.Execute(&condBuffer, "new") |
|
if err != nil { |
|
return err |
|
} |
|
conditionNew := condBuffer.String() |
|
condBuffer = strings.Builder{} |
|
err = value.ConditionTemplate.Execute(&condBuffer, "old") |
|
if err != nil { |
|
return err |
|
} |
|
conditionOld := condBuffer.String() |
|
|
|
condition := "(" + conditionNew + ") and (old is null or not (" + conditionOld + "))" |
|
|
|
_, err := tx.Exec(ctx, |
|
`CREATE OR REPLACE FUNCTION "`+triggerName+`"() |
|
RETURNS TRIGGER AS $$ |
|
DECLARE |
|
BEGIN |
|
IF (`+condition+`) THEN |
|
PERFORM pg_notify('`+triggerName+`', CAST(NEW.`+value.PrimaryKeyColumn+` AS TEXT)); |
|
END IF; |
|
RETURN NEW; |
|
END $$ LANGUAGE PLPGSQL |
|
`) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
_, err = tx.Exec(ctx, `DROP TRIGGER IF EXISTS "`+ |
|
triggerName+`" ON `+value.Table) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
_, err = tx.Exec(ctx, `CREATE TRIGGER "`+triggerName+`"`+` |
|
AFTER INSERT OR UPDATE ON `+value.Table+` |
|
FOR EACH ROW |
|
EXECUTE PROCEDURE "`+triggerName+`"() |
|
`) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
} |
|
} |
|
return nil |
|
}) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
running := make(map[string]map[string]bool) |
|
for key, _ := range cfg.Listeners { |
|
triggerName := "pgxevent:" + cfg.ServiceName + ":" + key |
|
_, err := conn.Exec(ctx, `LISTEN "`+triggerName+`"`) |
|
if err != nil { |
|
return err |
|
} |
|
running[key] = make(map[string]bool) |
|
} |
|
var cascadingError error |
|
startChannel := make(chan request) |
|
completionChannel := make(chan completion) |
|
counter := 2 |
|
teardown := false |
|
|
|
go func() { |
|
prefix := "pgxevent:" + cfg.ServiceName + ":" |
|
for { |
|
notification, err := conn.Conn().WaitForNotification(ctx) |
|
if err != nil { |
|
completionChannel <- completion{"", "", err} |
|
return |
|
} |
|
startChannel <- request{notification.Channel[len(prefix):], |
|
notification.Payload} |
|
} |
|
}() |
|
|
|
go func() { |
|
for channel, notification := range cfg.Listeners { |
|
condBuffer := strings.Builder{} |
|
err := notification.ConditionTemplate.Execute(&condBuffer, "Tab") |
|
if err != nil { |
|
completionChannel <- completion{"", "", err} |
|
return |
|
} |
|
rows, err := pool.Query(ctx, `SELECT CAST ( Tab.`+ |
|
notification.PrimaryKeyColumn+` AS text) as id FROM "`+ |
|
notification.Table+`" AS Tab WHERE`+ |
|
`(`+condBuffer.String()+`)`) |
|
if err != nil { |
|
completionChannel <- completion{"", "", err} |
|
return |
|
} |
|
for rows.Next() { |
|
var payload string |
|
err = rows.Scan(&payload) |
|
if err != nil { |
|
completionChannel <- completion{"", "", err} |
|
return |
|
} |
|
startChannel <- request{channel, payload} |
|
} |
|
|
|
} |
|
completionChannel <- completion{} |
|
return |
|
}() |
|
|
|
for { |
|
select { |
|
case start := <-startChannel: |
|
if !teardown { |
|
if !running[start.Channel][start.Payload] { |
|
counter++ |
|
running[start.Channel][start.Payload] = true |
|
go func(req request) { |
|
err := cfg.Listeners[start.Channel]. |
|
Handler.Handle(ctx, pool, req.Payload) |
|
completionChannel <- completion{req.Channel, req.Payload, err} |
|
}(start) |
|
|
|
} |
|
} |
|
case complete := <-completionChannel: |
|
if complete.Error != nil { |
|
if !teardown { |
|
teardown = true |
|
cancel() |
|
cascadingError = complete.Error |
|
} |
|
} |
|
if complete.Channel != "" { |
|
delete(running[complete.Channel], complete.Payload) |
|
} |
|
counter-- |
|
if counter == 0 { |
|
return cascadingError |
|
} |
|
} |
|
} |
|
} |
|
|
|
type EventHandler interface { |
|
Handle(ctx context.Context, pool *pgxpool.Pool, id string) error |
|
}
|
|
|