forked from isoos/postgresql-dart
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathv3_test.dart
476 lines (396 loc) · 14.1 KB
/
v3_test.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
import 'dart:async';
import 'package:async/async.dart';
import 'package:postgres/messages.dart';
import 'package:postgres/postgres.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
import 'docker.dart';
final _connectionSettings = ConnectionSettings(
transformer: loggingTransformer('conn'),
applicationName: 'test_app',
);
void main() {
withPostgresServer('PgConnection', (server) {
late Connection connection;
setUp(() async {
connection = await Connection.open(
await server.endpoint(),
settings: _connectionSettings,
);
});
tearDown(() => connection.close());
test('runtime parameters', () async {
final p = connection.info.parameters;
expect(p.applicationName, 'test_app');
});
test('simple queries', () async {
final rs = await connection.execute("SELECT 'dart', 42, NULL");
expect(rs, [
['dart', 42, null]
]);
expect(rs.single.toColumnMap(), {'?column?': null});
final appRs = await connection
.execute("SELECT current_setting('application_name');");
expect(appRs.single.single, 'test_app');
});
test('statement without rows', () async {
final result = await connection.execute(
Sql('''SELECT pg_notify('VIRTUAL','Payload 2');'''),
ignoreRows: true,
);
expect(result, isEmpty);
expect(result.schema.columns, [
isA<ResultSchemaColumn>()
.having((e) => e.columnName, 'columnName', 'pg_notify')
.having((e) => e.type, 'type', Type.voidType)
]);
});
test('queries without a schema message', () async {
final response =
await connection.execute('CREATE TEMPORARY TABLE foo (bar INTEGER);');
expect(response.affectedRows, isZero);
expect(response.schema.columns, isEmpty);
});
test('can run multiple statements at once', () async {
await connection.execute('CREATE TEMPORARY TABLE foo (bar INTEGER);');
final res = await connection.execute(
'INSERT INTO foo VALUES (1); INSERT INTO foo VALUES (2);',
// Postgres doesn't allow to prepare multiple statements at once, but
// we should handle the responses when running simple statements.
queryMode: QueryMode.simple,
);
expect(res.affectedRows, 2);
});
test('queue multiple queries concurrently, execute sequentially', () async {
final sw = Stopwatch()..start();
await Future.wait([
connection.execute('SELECT 1, pg_sleep(1)'),
connection.execute('SELECT 1, pg_sleep(2)'),
connection.execute('SELECT 1, pg_sleep(3)'),
]);
expect(sw.elapsed.inSeconds >= 6, isTrue);
});
test('listen and notify', () async {
const channel = 'test_channel';
expect(connection.channels[channel],
emitsInOrder(['my notification', isEmpty]));
await connection.channels.notify(channel, 'my notification');
await connection.channels.notify(channel);
});
test('can use same variable multiple times', () async {
final stmt = await connection
.prepare(Sql(r'SELECT $1 AS a, $1 + 2 AS b', types: [Type.integer]));
final rows = await stmt.run([10]);
expect(rows, [
[10, 12]
]);
expect(rows.single.toColumnMap(), {'a': 10, 'b': 12});
});
test('can use mapped queries with json-contains operator', () async {
final rows = await connection.execute(
Sql.named('SELECT @a:jsonb @> @b:jsonb'),
parameters: {
'a': {'foo': 'bar', 'another': 'as well'},
'b': {'foo': 'bar'},
},
);
expect(rows, [
[true]
]);
});
test('can use json path predicate check operator', () async {
final rows = await connection.execute(
Sql.named('SELECT @a:jsonb @@ @b:text::jsonpath'),
parameters: {
'a': [1, 2, 3, 4, 5],
'b': r'$.a[*] > 2',
},
);
expect(rows, [
[false]
]);
});
group('throws error', () {
setUp(() async {
await connection
.execute('CREATE TEMPORARY TABLE foo (id INTEGER PRIMARY KEY);');
await connection.execute('INSERT INTO foo VALUES (1);');
});
test('for duplicate with simple query', () async {
await expectLater(
() => connection.execute('INSERT INTO foo VALUES (1);'),
_throwsPostgresException,
);
// Make sure the connection is still usable.
await connection.execute('SELECT 1');
});
test('for duplicate with extended query', () async {
await expectLater(
() => connection.execute(
Sql(r'INSERT INTO foo VALUES ($1);'),
parameters: [TypedValue(Type.integer, 1)],
),
_throwsPostgresException,
);
// Make sure the connection is still in a usable state.
await connection.execute('SELECT 1');
});
test('for duplicate in prepared statement', () async {
final stmt = await connection.prepare(
Sql(r'INSERT INTO foo VALUES ($1);', types: [Type.integer]),
);
final stream = stmt.bind([1]);
await expectLater(stream, emitsError(_isPostgresException));
});
});
test('run', () async {
const returnValue = 'returned from run()';
late Session leakedSession;
await expectLater(
connection.run(expectAsync1((session) async {
expect(session.isOpen, isTrue);
leakedSession = session;
await session
.execute('CREATE TEMPORARY TABLE foo (id INTEGER PRIMARY KEY);');
await session.execute('INSERT INTO foo VALUES (3);');
return returnValue;
})),
completion(returnValue),
);
expect(leakedSession.isOpen, isFalse);
final rows = await connection.execute('SELECT * FROM foo');
expect(rows, [
[3]
]);
});
test('run with bad query mode', () async {
await expectLater(
connection.run(settings: SessionSettings(queryMode: QueryMode.simple),
(session) async {
await session.execute(
r'SELECT * FROM foo WHERE where id = $1',
parameters: ['id#1'],
);
}),
throwsA(isA<PgException>()));
});
group('runTx', () {
setUp(() async {
await connection.execute('CREATE TEMPORARY TABLE t (id INT UNIQUE)');
});
test('Rows are Lists of column values', () async {
await connection.execute('INSERT INTO t (id) VALUES (1)');
final outValue = await connection.runTx((ctx) async {
return await ctx.execute(
Sql(r'SELECT * FROM t WHERE id = $1 LIMIT 1'),
parameters: [TypedValue(Type.integer, 1)],
);
});
expect(outValue, [
[1]
]);
});
test('Send successful transaction succeeds, returns returned value',
() async {
final outResult = await connection.runTx((c) async {
await c.execute('INSERT INTO t (id) VALUES (1)');
return await c.execute('SELECT id FROM t');
});
expect(outResult, [
[1]
]);
final result = await connection.execute('SELECT id FROM t');
expect(result, [
[1]
]);
});
test('Make sure two simultaneous transactions cannot be interwoven',
() async {
final orderEnsurer = [];
final firstTransactionFuture = connection.runTx((c) async {
orderEnsurer.add(11);
await c.execute('INSERT INTO t (id) VALUES (1)');
orderEnsurer.add(12);
final result = await c.execute('SELECT id FROM t');
orderEnsurer.add(13);
return result;
});
final secondTransactionFuture = connection.runTx((c) async {
orderEnsurer.add(21);
await c.execute('INSERT INTO t (id) VALUES (2)');
orderEnsurer.add(22);
final result = await c.execute('SELECT id FROM t');
orderEnsurer.add(23);
return result;
});
final firstResults = await firstTransactionFuture;
final secondResults = await secondTransactionFuture;
expect(orderEnsurer, [11, 12, 13, 21, 22, 23]);
expect(firstResults, [
[1]
]);
expect(secondResults, [
[1],
[2]
]);
});
test('A transaction does not preempt pending queries', () async {
// Add a few insert queries but don't await, then do a transaction that does a fetch,
// make sure that transaction sees all of the elements.
unawaited(connection.execute('INSERT INTO t (id) VALUES (1)',
ignoreRows: true));
unawaited(connection.execute('INSERT INTO t (id) VALUES (2)',
ignoreRows: true));
unawaited(connection.execute('INSERT INTO t (id) VALUES (3)',
ignoreRows: true));
final results = await connection.runTx((ctx) async {
return await ctx.execute('SELECT id FROM t');
});
expect(results, [
[1],
[2],
[3]
]);
});
test('can be rolled back by throwing', () async {
final expected = Exception('for test');
await expectLater(
() => connection.runTx((ctx) async {
await ctx.execute('INSERT INTO t (id) VALUES (1);');
throw expected;
}),
throwsA(expected),
);
expect(await connection.execute('SELECT id FROM t'), isEmpty);
});
test('single simple query', () async {
final res = await connection.execute(
"SELECT 'dart', 42, true, false, NULL",
queryMode: QueryMode.simple,
);
expect(res, [
['dart', 42, true, false, null]
]);
});
test('parameterized query throws', () async {
await expectLater(
() => connection.execute(
Sql('SELECT 1'),
parameters: [TypedValue(Type.integer, 1)],
queryMode: QueryMode.simple,
),
_throwsPostgresException,
);
});
});
test('can inject transformer into connection', () async {
final incoming = <ServerMessage>[];
final outgoing = <ClientMessage>[];
final transformer = StreamChannelTransformer<Message, Message>(
StreamTransformer.fromHandlers(
handleData: (msg, sink) {
incoming.add(msg as ServerMessage);
sink.add(msg);
},
),
StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
outgoing.add(msg as ClientMessage);
sink.add(msg);
}),
);
final connection = await Connection.open(
await server.endpoint(),
settings: ConnectionSettings(
transformer: transformer,
),
);
addTearDown(connection.close);
await connection.execute("SELECT 'foo'", ignoreRows: true);
expect(incoming, contains(isA<DataRowMessage>()));
expect(outgoing, contains(isA<QueryMessage>()));
});
test('can close connection', () async {
expect(connection.isOpen, isTrue);
await connection.execute('SELECT 1');
var didFinishClose = false;
expect(
connection.closed.then((_) => didFinishClose), completion(isFalse));
await connection.close();
didFinishClose = true;
expect(connection.isOpen, isFalse);
expect(
() => connection.execute('SELECT 1'), throwsA(_isPostgresException));
});
});
withPostgresServer('can close connection after error conditions', (server) {
late Connection conn1;
late Connection conn2;
setUp(() async {
conn1 = await Connection.open(
await server.endpoint(),
settings: ConnectionSettings(
transformer: loggingTransformer('c1'),
),
);
conn2 = await Connection.open(
await server.endpoint(),
settings: ConnectionSettings(
transformer: loggingTransformer('c2'),
),
);
});
tearDown(() async {
await conn1.close();
await conn2.close();
});
for (final concurrentQuery in [false, true]) {
test(
'with concurrent query: $concurrentQuery',
() async {
final endpoint = await server.endpoint();
final res = await conn2.execute(
"SELECT pid FROM pg_stat_activity where usename = '${endpoint.username}';");
final conn1PID = res.first.first as int;
// Simulate issue by terminating a connection during a query
if (concurrentQuery) {
// We expect that terminating the connection will throw. Use
// pg_sleep to avoid flaky race conditions between the conditions.
expect(conn1.execute('select pg_sleep(1) from pg_stat_activity;'),
_throwsPostgresException);
}
// Terminate the conn1 while the query is running
await conn2.execute('select pg_terminate_backend($conn1PID);');
},
);
}
test('with simple query protocol', () async {
final endpoint = await server.endpoint();
// Get the PID for conn1
final res = await conn2.execute(
"SELECT pid FROM pg_stat_activity where usename = '${endpoint.username}';");
final conn1PID = res.first.first as int;
expect(
conn1.execute('select pg_sleep(1) from pg_stat_activity;',
ignoreRows: true),
_throwsPostgresException,
);
await conn2.execute(
'select pg_terminate_backend($conn1PID) from pg_stat_activity;');
});
});
withPostgresServer('reacts to socket being closed', (server) {
test('when killing server', () async {
final conn = await server.newConnection();
var isClosed = false;
unawaited(conn.closed.whenComplete(() => isClosed = true));
expect(isClosed, isFalse);
await conn.execute('select 1');
expect(isClosed, isFalse);
await server.kill();
await conn.closed;
expect(isClosed, isTrue);
});
});
}
final _isPostgresException = isA<PgException>();
final _throwsPostgresException = throwsA(_isPostgresException);