-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_usage.ex
71 lines (61 loc) · 1.85 KB
/
example_usage.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
defmodule PostgresqlMessageQueue.ExampleUsage do
@moduledoc """
This module demonstrates durable asynchronous job processing using the internal message queue.
"""
alias PostgresqlMessageQueue.Messaging
alias PostgresqlMessageQueue.Persistence.Repo
require Logger
@behaviour Messaging.MessageHandler
@spec send_greeting!(String.t()) :: :ok
def send_greeting!(greeting \\ "Hello!") when is_binary(greeting) do
{:ok, :ok} =
Repo.transaction(fn ->
[
%Messaging.Message{
type: "ExampleUsage.Events.Greeting",
schema_version: 1,
payload: %{greeting: greeting}
}
]
|> Messaging.broadcast_messages!(to_queue: Messaging.global_queue())
end)
:ok
end
@spec send_cascade!() :: :ok
def send_cascade!() do
{:ok, :ok} =
Repo.transaction(fn ->
send_cascade_at_depth!(1, [])
end)
:ok
end
@spec send_cascade_at_depth!(non_neg_integer(), [non_neg_integer()]) :: :ok
defp send_cascade_at_depth!(depth, path)
when is_integer(depth) and depth > 0 and is_list(path) do
for index <- 1..10 do
%Messaging.Message{
type: "ExampleUsage.Events.Cascade",
schema_version: 1,
payload: %{depth: depth, path: [index | path]}
}
end
|> Messaging.broadcast_messages!(to_queue: Messaging.global_queue())
end
@impl Messaging.MessageHandler
def handle_message(%Messaging.Message{
type: "ExampleUsage.Events.Greeting",
payload: %{"greeting" => greeting}
}) do
Logger.info("ExampleUsage: received greeting: #{greeting}")
end
def handle_message(%Messaging.Message{
type: "ExampleUsage.Events.Cascade",
payload: %{"depth" => depth, "path" => path}
}) do
if depth < 4 do
send_cascade_at_depth!(depth + 1, path)
else
:ok
end
end
end