Skip to content

Commit b322101

Browse files
committed
multi client configuration
1 parent d58622b commit b322101

File tree

11 files changed

+273
-151
lines changed

11 files changed

+273
-151
lines changed

‎pkg/async-command/DependencyInjection/AsyncCommandExtension.php

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\AsyncCommand\DependencyInjection;
44

5+
use Enqueue\AsyncCommand\RunCommandProcessor;
56
use Symfony\Component\Config\FileLocator;
67
use Symfony\Component\DependencyInjection\ContainerBuilder;
78
use Symfony\Component\DependencyInjection\Extension\Extension;
@@ -16,5 +17,13 @@ public function load(array $configs, ContainerBuilder $container)
1617
{
1718
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
1819
$loader->load('services.yml');
20+
21+
$service = $container->register('enqueue.async_command.run_command_processor', RunCommandProcessor::class)
22+
->addArgument('%kernel.project_dir%')
23+
;
24+
25+
foreach ($configs['clients'] as $client) {
26+
$service->addTag('enqueue.command_subscriber', ['client' => $client]);
27+
}
1928
}
2029
}

‎pkg/enqueue-bundle/DependencyInjection/Configuration.php

+11-48
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
66
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
77
use Enqueue\Symfony\DependencyInjection\TransportFactory;
8-
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
98
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
109
use Symfony\Component\Config\Definition\ConfigurationInterface;
1110

@@ -69,64 +68,28 @@ public function getConfigTreeBuilder(): TreeBuilder
6968
->append($consumerConfig)
7069
->append($clientConfig)
7170
->append($monitoringConfig)
71+
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
72+
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
73+
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
74+
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
75+
->booleanNode('reply_extension')->defaultTrue()->end()
76+
->end()->end()
77+
->arrayNode('async_commands')
78+
->addDefaultsIfNotSet()
79+
->canBeEnabled()
80+
->end()
7281
->end()
7382
->end()
7483
;
7584

76-
77-
// $transportFactory = new TransportFactory('default');
78-
//
79-
// /** @var ArrayNodeDefinition $transportNode */
80-
// $transportNode = $rootNode->children()->arrayNode('transport');
81-
// $transportNode
82-
// ->beforeNormalization()
83-
// ->always(function ($value) {
84-
// if (empty($value)) {
85-
// return ['default' => ['dsn' => 'null:']];
86-
// }
87-
// if (is_string($value)) {
88-
// return ['default' => ['dsn' => $value]];
89-
// }
90-
//
91-
// if (is_array($value) && array_key_exists('dsn', $value)) {
92-
// return ['default' => $value];
93-
// }
94-
//
95-
// return $value;
96-
// });
97-
// $transportPrototypeNode = $transportNode
98-
// ->requiresAtLeastOneElement()
99-
// ->useAttributeAsKey('key')
100-
// ->prototype('array')
101-
// ;
102-
//
103-
// $transportFactory->addTransportConfiguration($transportPrototypeNode);
104-
//
105-
// $consumptionNode = $rootNode->children()->arrayNode('consumption');
106-
// $transportFactory->addQueueConsumerConfiguration($consumptionNode);
107-
//
108-
// $clientFactory = new ClientFactory('default');
109-
// $clientNode = $rootNode->children()->arrayNode('client');
110-
// $clientFactory->addClientConfiguration($clientNode, $this->debug);
111-
//
11285
// $rootNode->children()
11386
// ->booleanNode('job')->defaultFalse()->end()
11487
// ->arrayNode('async_events')
11588
// ->addDefaultsIfNotSet()
11689
// ->canBeEnabled()
11790
// ->end()
118-
// ->arrayNode('async_commands')
119-
// ->addDefaultsIfNotSet()
120-
// ->canBeEnabled()
121-
// ->end()
122-
// ->arrayNode('extensions')->addDefaultsIfNotSet()->children()
123-
// ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
124-
// ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
125-
// ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
126-
// ->booleanNode('reply_extension')->defaultTrue()->end()
127-
// ->end()->end()
12891
// ;
129-
//
92+
13093
return $tb;
13194
}
13295
}

‎pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+157-51
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@
44

55
use Enqueue\AsyncCommand\DependencyInjection\AsyncCommandExtension;
66
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
7+
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
8+
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
79
use Enqueue\Client\CommandSubscriberInterface;
810
use Enqueue\Client\TopicSubscriberInterface;
11+
use Enqueue\Consumption\Extension\ReplyExtension;
12+
use Enqueue\Consumption\Extension\SignalExtension;
913
use Enqueue\JobQueue\Job;
14+
use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
1015
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
1116
use Enqueue\Symfony\DependencyInjection\TransportFactory;
1217
use Symfony\Component\Config\FileLocator;
1318
use Symfony\Component\Config\Resource\FileResource;
1419
use Symfony\Component\DependencyInjection\ContainerBuilder;
1520
use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
1621
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
22+
use Symfony\Component\DependencyInjection\Reference;
1723
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
1824

1925
final class EnqueueExtension extends Extension implements PrependExtensionInterface
@@ -46,25 +52,36 @@ public function load(array $configs, ContainerBuilder $container): void
4652
$transportNames[] = $name;
4753

4854
$transportFactory = (new TransportFactory($name));
49-
$transportFactory->buildConnectionFactory($container, $config['transport']);
55+
$transportFactory->buildConnectionFactory($container, $configs['transport']);
5056
$transportFactory->buildContext($container, []);
51-
$transportFactory->buildQueueConsumer($container, $config['consumption']);
57+
$transportFactory->buildQueueConsumer($container, $configs['consumption']);
5258
$transportFactory->buildRpcClient($container, []);
5359

5460
// client
5561
if (isset($configs['client'])) {
5662
$clientNames[] = $name;
5763

58-
$clientConfig = $config['client'];
64+
$clientConfig = $configs['client'];
5965
// todo
60-
$clientConfig['transport'] = $config['transport'];
61-
$clientConfig['consumption'] = $config['consumption'];
66+
$clientConfig['transport'] = $configs['transport'];
67+
$clientConfig['consumption'] = $configs['consumption'];
6268

6369
$clientFactory = new ClientFactory($name);
6470
$clientFactory->build($container, $clientConfig, $defaultName === $name);
65-
$clientFactory->createDriver($container, $config['transport']);
71+
$clientFactory->createDriver($container, $configs['transport']);
6672
$clientFactory->createFlushSpoolProducerListener($container);
6773
}
74+
75+
// monitoring
76+
if (isset($configs['monitoring'])) {
77+
$monitoringFactory = new MonitoringFactory($name);
78+
$monitoringFactory->buildStorage($container, $configs['monitoring']);
79+
$monitoringFactory->buildConsumerExtension($container, $configs['monitoring']);
80+
81+
if (isset($configs['client'])) {
82+
$monitoringFactory->buildClientExtension($container, $configs['monitoring']);
83+
}
84+
}
6885
}
6986

7087
$container->setParameter('enqueue.transports', $transportNames);
@@ -74,54 +91,34 @@ public function load(array $configs, ContainerBuilder $container): void
7491
$this->setupAutowiringForProcessors($container, $clientNames);
7592
}
7693

77-
// @todo register MessageQueueCollector
78-
79-
return;
80-
81-
//
82-
if ($config['job']) {
83-
if (!class_exists(Job::class)) {
84-
throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
85-
}
86-
87-
$loader->load('job.yml');
88-
}
89-
90-
if ($config['async_events']['enabled']) {
91-
if (false == class_exists(AsyncEventDispatcherExtension::class)) {
92-
throw new \LogicException('The "enqueue/async-event-dispatcher" package has to be installed.');
93-
}
94-
95-
$extension = new AsyncEventDispatcherExtension();
96-
$extension->load([[
97-
'context_service' => 'enqueue.transport.default.context',
98-
]], $container);
99-
}
100-
101-
if ($config['async_commands']['enabled']) {
102-
if (false == class_exists(AsyncCommandExtension::class)) {
103-
throw new \LogicException('The "enqueue/async-command" package has to be installed.');
104-
}
94+
$this->loadAsyncCommands($config, $container);
10595

106-
$extension = new AsyncCommandExtension();
107-
$extension->load([[]], $container);
108-
}
109-
110-
if ($config['extensions']['doctrine_ping_connection_extension']) {
111-
$loader->load('extensions/doctrine_ping_connection_extension.yml');
112-
}
113-
114-
if ($config['extensions']['doctrine_clear_identity_map_extension']) {
115-
$loader->load('extensions/doctrine_clear_identity_map_extension.yml');
116-
}
96+
// extensions
97+
$this->loadDoctrinePingConnectionExtension($config, $container);
98+
$this->loadDoctrineClearIdentityMapExtension($config, $container);
99+
$this->loadSignalExtension($config, $container);
100+
$this->loadReplyExtension($config, $container);
117101

118-
if ($config['extensions']['signal_extension']) {
119-
$loader->load('extensions/signal_extension.yml');
120-
}
102+
// @todo register MessageQueueCollector
121103

122-
if ($config['extensions']['reply_extension']) {
123-
$loader->load('extensions/reply_extension.yml');
124-
}
104+
// if ($config['job']) {
105+
// if (!class_exists(Job::class)) {
106+
// throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
107+
// }
108+
//
109+
// $loader->load('job.yml');
110+
// }
111+
//
112+
// if ($config['async_events']['enabled']) {
113+
// if (false == class_exists(AsyncEventDispatcherExtension::class)) {
114+
// throw new \LogicException('The "enqueue/async-event-dispatcher" package has to be installed.');
115+
// }
116+
//
117+
// $extension = new AsyncEventDispatcherExtension();
118+
// $extension->load([[
119+
// 'context_service' => 'enqueue.transport.default.context',
120+
// ]], $container);
121+
// }
125122
}
126123

127124
public function getConfiguration(array $config, ContainerBuilder $container): Configuration
@@ -187,4 +184,113 @@ private function setupAutowiringForProcessors(ContainerBuilder $container, array
187184
$commandSubscriber->addTag('enqueue.command_subscriber', ['client' => $clientName]);
188185
}
189186
}
187+
188+
private function loadDoctrinePingConnectionExtension(array $config, ContainerBuilder $container): void
189+
{
190+
$configNames = [];
191+
foreach ($config as $name => $modules) {
192+
if ($modules['extensions']['doctrine_ping_connection_extension']) {
193+
$configNames[] = $name;
194+
}
195+
}
196+
197+
if (false == $configNames) {
198+
return;
199+
}
200+
201+
$extension = $container->register('enqueue.consumption.doctrine_ping_connection_extension', DoctrinePingConnectionExtension::class)
202+
->addArgument(new Reference('doctrine'))
203+
;
204+
205+
foreach ($configNames as $name) {
206+
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
207+
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
208+
}
209+
}
210+
211+
private function loadDoctrineClearIdentityMapExtension(array $config, ContainerBuilder $container): void
212+
{
213+
$configNames = [];
214+
foreach ($config as $name => $modules) {
215+
if ($modules['extensions']['doctrine_clear_identity_map_extension']) {
216+
$configNames[] = $name;
217+
}
218+
}
219+
220+
if (false == $configNames) {
221+
return;
222+
}
223+
224+
$extension = $container->register('enqueue.consumption.doctrine_clear_identity_map_extension', DoctrineClearIdentityMapExtension::class)
225+
->addArgument(new Reference('doctrine'))
226+
;
227+
228+
foreach ($configNames as $name) {
229+
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
230+
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
231+
}
232+
}
233+
234+
private function loadSignalExtension(array $config, ContainerBuilder $container): void
235+
{
236+
$configNames = [];
237+
foreach ($config as $name => $modules) {
238+
if ($modules['extensions']['signal_extension']) {
239+
$configNames[] = $name;
240+
}
241+
}
242+
243+
if (false == $configNames) {
244+
return;
245+
}
246+
247+
$extension = $container->register('enqueue.consumption.signal_extension', SignalExtension::class);
248+
249+
foreach ($configNames as $name) {
250+
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
251+
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
252+
}
253+
}
254+
255+
private function loadReplyExtension(array $config, ContainerBuilder $container): void
256+
{
257+
$configNames = [];
258+
foreach ($config as $name => $modules) {
259+
if ($modules['extensions']['reply_extension']) {
260+
$configNames[] = $name;
261+
}
262+
}
263+
264+
if (false == $configNames) {
265+
return;
266+
}
267+
268+
$extension = $container->register('enqueue.consumption.reply_extension', ReplyExtension::class);
269+
270+
foreach ($configNames as $name) {
271+
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
272+
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
273+
}
274+
}
275+
276+
private function loadAsyncCommands(array $config, ContainerBuilder $container): void
277+
{
278+
$configNames = [];
279+
foreach ($config as $name => $modules) {
280+
if ($modules['async_commands']['enabled']) {
281+
$configNames[] = $name;
282+
}
283+
}
284+
285+
if (false == $configNames) {
286+
return;
287+
}
288+
289+
if (false == class_exists(AsyncCommandExtension::class)) {
290+
throw new \LogicException('The "enqueue/async-command" package has to be installed.');
291+
}
292+
293+
$extension = new AsyncCommandExtension();
294+
$extension->load(['clients' => $configNames], $container);
295+
}
190296
}

‎pkg/enqueue-bundle/Resources/config/extensions/doctrine_clear_identity_map_extension.yml

-9
This file was deleted.

‎pkg/enqueue-bundle/Resources/config/extensions/doctrine_ping_connection_extension.yml

-8
This file was deleted.

‎pkg/enqueue-bundle/Resources/config/extensions/reply_extension.yml

-6
This file was deleted.

0 commit comments

Comments
 (0)