# Processor After configuring and create an instance of [`Queue`](create_queue_instance), we can now define the processor we need in our program. ## Methods We have four ways to implement processors. - [Command](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html): If our processor definition is command type, it means that every client should expect an answer when calling this command. Like an RPC call. - [Worker](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html): Send message to queue for doing background process. - **Emit**: Send message to many consumers at once. - **Topic**: Send message to many consumers at once selectively. The definition of processors means that we are the recipient and can execute our defined processor. ### Processor Methods Mandatory method of each processor: - `getQueueName()`: This should be implemented in processor class and is mandatory. Queue name which processor should listen to receive message. Each processor can override this method and is optional: - `getQueue()`: Access to current [`queue`](create_queue_instance) object which is execute current processor. - `getProcessorConsumer()`: Access to current [`processor-consumer`](processor-consumer.md) object which is execute current processor. - `beforeExecute(Response $request)`: Run before the `execute()` method. - `afterExecute(Response $request)`: Run after the `execute()` method. - `process(AmqpMessage $message, AmqpConsumer $consumer)`: Run after the `afterExecute()` method to decision current message received should be accept or reject or requeue. return value can be : `ack`, `reject`, `requeue`. - `afterMessageAcknowledge(string $status)`: Run after message acknowledge. status can be : `ack`, `reject`, `requeue`. - `resetAfterProcess()`: Return boolean value. If returned value is true, each time message received and execute the processor, processor object should recreate. - `processorFinished(?string $status)`: Will run before complete the processor. status can be : `ack`, `reject`, `requeue` and `null`. In redelivered status set to `null`. - `getQueueTtl()`: Return time as millisecond for time to live of created queue for current processor. ### Command The `UserGetInfoCommand` sends a request to queue and wait to get a response, distribute command among multiple consumers, after handling the message, consumer back the result to sender. ```{code-block} php --- name: command_definition caption: Command definition --- 123, 'name' => 'Test', ]); } public function getJobName(): string { return 'get_profile_info'; } public function getQueueName(): string { return 'user_service_command'; } } ``` By defining `UserGetInfoCommand` processor, we will receive any request from the client that is in the queue (`user_service_command`) and job (`get_profile_info`) and we will send a response back to the client. We can define several processor for same queue and different job. #### Command Methods - `execute(Request $request)`: This should be implemented in command class and is mandatory. Run when new command received, `$request` variable is a object of received from client. Response object return from this method reply back to the client as a response. - `getJobName()`: This should be implemented in command class and is mandatory. Job name of the command. - `afterMessageReplytoCommand(string $messageId, string $replyId, string $correlationId, string $status)`: This method run after we reply back to command. ### Worker Distribute time-consuming tasks among multiple workers, consumers can get the message and pass it to the related processor to run. ```{code-block} php --- name: worker_definition caption: Worker definition --- getQueue()->getAppName()); } } ``` #### Emit Methods - `execute(Request $request)`: This should be implemented in emit class and is mandatory. Run when new emit received, `$request` variable is object of data received from client. - `getTopicName()`: This should be implemented in emit class and is mandatory. Topic name should emit listen to it. - `durableQueue()`: Return boolean value. If returned value is true, defined queue will be durable. ### Topic Topic send a request to all consumers at once, which listen to a specific topic and routing key. ```{code-block} php --- name: topic_definition caption: Topic definition --- getQueue()->getAppName()); } } ``` #### Topic Methods - `execute(RequestTopic $request)`: This should be implemented in emit class and is mandatory. Run when new emit received, `$request` variable is object of data received from client. - `getTopicName()`: This should be implemented in topic class and is mandatory. Topic name should topic listen to it. - `getRoutingKeys()`: This should be implemented in topic class and is mandatory. Routing key name should topic listen to it. One topic class can listen to several routing key in same topic. - `durableQueue()`: Return boolean value. If returned value is true, defined queue will be durable. ## Policies `Emit` and `Topic` method use [Single Active Consumer](https://www.rabbitmq.com/consumers.html#single-active-consumer) by default. This is because we may run multiple processes of the program in several nodes and we do not need to receive all of them and only once is enough to receive information for each processor. If we have a several processor with the same queue, We can not set durable one and non-durable for another. After creating queue in rabbitmq we can not change property like an durable or single active consumer. For update queue arguments, queue should be delete.