Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use Postgres LISTEN/Notify functionality? #1566

Open
malteneuss opened this issue Feb 5, 2025 · 0 comments
Open

How to use Postgres LISTEN/Notify functionality? #1566

malteneuss opened this issue Feb 5, 2025 · 0 comments

Comments

@malteneuss
Copy link
Contributor

I'm using Persistent with Postgres and would like to use as a light-weight publish/subscribe message queue using Postgres LISTEN/NOTIFY and SKIP LOCKED functionality described in https://www.youtube.com/watch?v=WIRy1Ws47ic.

I managed to get it to work with Persistents underlying lower-level library postgresql-simple, but i would like to stay with the Persistent API. Is that even possible? If not, is it possible to get a connection out of a Persistent connection pool to pass to postgresql-simple functions?

Here's the postgresql-simple solution:

data MyTask = MyTask {
    id :: Int,
    payload :: Text,
    createdAt :: UTCTime
}
    deriving stock (Show, Generic)
    deriving anyclass (FromRow)
    
main :: IO ()
main = do
    -- we have to use postgresql-simple to listen for notifications
    -- https://hackage.haskell.org/package/postgresql-simple/docs/Database-PostgreSQL-Simple-Notification.html#v:getNotification
    connectionStr <- getEnv "DATABASE_URL"
    forkIO $ do
        -- receive notifications on a separate thread, https://www.postgresql.org/docs/current/sql-listen.html
        conn <- connectPostgreSQL (BS8.pack connectionStr)
        void $ execute_ conn "LISTEN task_listener"
        forever $ do
            tasks <- query_ @MyTask conn "BEGIN; SELECT * FROM task FOR UPDATE SKIP LOCKED LIMIT 1;"
            if null tasks
                then do 
                    putStrLn "No tasks available. Wait for new entries via NOTIFY."
                    void $ execute_ conn "COMMIT;"
                    notification <- getNotification conn
                    putStrLn $ "Received notification: " ++ (show $ notification)
                else do
                    let task = head tasks
                    putStrLn $ "Received task: " ++ (show $ task)
                    void $ execute conn "DELETE FROM task WHERE id = ?" (Only (task.id))
                    void $ execute_ conn "COMMIT;"
            -- putStrLn $ "Received notification: " ++ (show $ notificationData notification)
    conn <- connectPostgreSQL (BS8.pack connectionStr)
    void $ execute conn "INSERT INTO task (payload) VALUES (?)" (Only @String ("new task payload"))
        -- trigger a notification, https://www.postgresql.org/docs/current/sql-notify.html
    -- Block the main thread
    threadDelay 10000000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant