-
Notifications
You must be signed in to change notification settings - Fork 420
/
Copy pathtest_idempotency_redis.py
199 lines (161 loc) · 7.18 KB
/
test_idempotency_redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import copy
import pytest
from testcontainers.redis import RedisContainer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
IdempotencyPersistenceLayerError,
)
from aws_lambda_powertools.utilities.idempotency.idempotency import (
idempotent,
idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
RedisCachePersistenceLayer,
)
pytest.skip(reason="Integration tests disabled for Redis Idempotency.", allow_module_level=True)
@pytest.fixture
def redis_container_image():
return "public.ecr.aws/docker/library/redis:7.2-alpine"
class LambdaContext:
def __init__(self):
self.function_name = "test-func"
self.memory_limit_in_mb = 128
self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"
def get_remaining_time_in_millis(self) -> int:
return 1000
@pytest.fixture
def lambda_context() -> LambdaContext:
return LambdaContext()
# test basic
def test_idempotent_function_and_lambda_handler_redis_basic(
lambda_context: LambdaContext,
redis_container_image,
):
with RedisContainer(image=redis_container_image) as redis_container:
redis_client = redis_container.get_client()
mock_event = {"data": "value"}
persistence_layer = RedisCachePersistenceLayer(client=redis_client)
expected_result = {"message": "Foo"}
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
def record_handler(record):
return expected_result
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):
return expected_result
# WHEN calling the function
fn_result = record_handler(record=mock_event)
# WHEN calling lambda handler
handler_result = lambda_handler(mock_event, lambda_context)
# THEN we expect the function and lambda handler to execute successfully
assert fn_result == expected_result
assert handler_result == expected_result
def test_idempotent_function_and_lambda_handler_redis_cache(
lambda_context: LambdaContext,
redis_container_image,
):
with RedisContainer(image=redis_container_image) as redis_container:
redis_client = redis_container.get_client()
mock_event = {"data": "value2"}
persistence_layer = RedisCachePersistenceLayer(client=redis_client)
result = {"message": "Foo"}
expected_result = copy.deepcopy(result)
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
def record_handler(record):
return result
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):
return result
# WHEN calling the function
fn_result = record_handler(record=mock_event)
# WHEN calling lambda handler
handler_result = lambda_handler(mock_event, lambda_context)
# THEN we expect the function and lambda handler to execute successfully
assert fn_result == expected_result
assert handler_result == expected_result
# modify the return to check if idem cache works
result = {"message": "Bar"}
fn_result2 = record_handler(record=mock_event)
# Second time calling lambda handler, test if same result
handler_result2 = lambda_handler(mock_event, lambda_context)
assert fn_result2 == expected_result
assert handler_result2 == expected_result
# modify the mock event to check if we got updated result
mock_event = {"data": "value3"}
fn_result3 = record_handler(record=mock_event)
# thrid time calling lambda handler, test if result updated
handler_result3 = lambda_handler(mock_event, lambda_context)
assert fn_result3 == result
assert handler_result3 == result
# test idem-inprogress
def test_idempotent_lambda_redis_in_progress(
lambda_context: LambdaContext,
redis_container_image,
):
"""
Test idempotent decorator where lambda_handler is already processing an event with matching event key
"""
with RedisContainer(image=redis_container_image) as redis_container:
redis_client = redis_container.get_client()
mock_event = {"data": "value4"}
persistence_store = RedisCachePersistenceLayer(client=redis_client)
lambda_response = {"foo": "bar"}
@idempotent(persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response
# register the context first
lambda_handler(mock_event, lambda_context)
# save additional to in_progress
mock_event = {"data": "value7"}
try:
persistence_store.save_inprogress(mock_event, 10000)
except IdempotencyItemAlreadyExistsError:
pass
with pytest.raises(IdempotencyAlreadyInProgressError):
lambda_handler(mock_event, lambda_context)
# test -remove
def test_idempotent_lambda_redis_delete(
lambda_context: LambdaContext,
redis_container_image,
):
with RedisContainer(image=redis_container_image) as redis_container:
redis_client = redis_container.get_client()
mock_event = {"data": "test_delete"}
persistence_layer = RedisCachePersistenceLayer(client=redis_client)
result = {"message": "Foo"}
@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):
return result
# first run is just to populate function infos for deletion.
# delete_record won't work if the function was not run yet. bug maybe?
lambda_handler(mock_event, lambda_context)
# delete what's might be dirty data
persistence_layer.delete_record(mock_event, IdempotencyItemNotFoundError)
# run second time to ensure clean result
handler_result = lambda_handler(mock_event, lambda_context)
assert handler_result == result
persistence_layer.delete_record(mock_event, IdempotencyItemNotFoundError)
# delete the idem and handler should output new result
result = {"message": "Foo2"}
handler_result2 = lambda_handler(mock_event, lambda_context)
assert handler_result2 == result
def test_idempotent_lambda_redis_credential(lambda_context: LambdaContext, redis_container_image):
with RedisContainer(image=redis_container_image) as redis_container:
redis_client = redis_container.get_client()
pwd = "terriblePassword"
usr = "test_acl_denial"
redis_client.acl_setuser(
username=usr,
enabled=True,
passwords="+" + pwd,
keys="*",
commands=["+hgetall", "-set"],
)
redis_client.auth(password=pwd, username=usr)
@idempotent(persistence_store=RedisCachePersistenceLayer(client=redis_client))
def lambda_handler(event, _):
return True
with pytest.raises(IdempotencyPersistenceLayerError):
lambda_handler("test_Acl", lambda_context)