Consuming Async Commands

Listening for Async Requests and Third-Party Events

When consuming Event-Driven messages we need to differentiate between:

  • Consuming Third-Party Async Domain Events
  • Listening for Async Command/Requests we are the owner

Because the semantic of a Command and an Event is quite different, we need to handle them differently.

  • A Command is a request to start an action. It may be rejected so the action may not start at all.
  • An Event is a notification that something has happened. It is not a request, it is a notification of a fact that already happen on a given system.

This distinction is paramount because a Command can have be requested by many different applications but should be listened only by one application. And on the other hand a Domain Event can be produced only by one application, but it can be listened by an unlimited number of applications.

So we need to differentiate between the provider/client of a given functionality. Not to be confused with the producer/consumer of a given message.

πŸ‘‰ A provider produces Domain Events and consumes Command Request. A client consumes Domain Events and produces Command Requests.

We recommend not to mix in the same AsyncAPI definition functionality that is provided with other functionality that is just consumed from third party.

With ZenWave AsyncAPI API-First Plugin you can just define AsyncAPI definitions once for the perspective of the provider. And use the provider's AsyncAPI definition to generate the client's AsyncAPI definition using the client role.

Or create two different AsyncAPI definitions, one for the provider and one for the client. But just don't mix them together in the same AsyncAPI definition file.

Using ZenWave ZDL as Definition Language for AsyncAPI

You can use ZDL To AsyncAPI Generator to generate AsyncAPI definitions from ZDL Models with a command like this:

jbang zw -p io.zenwave360.sdk.plugins.ZDLToAsyncAPIPlugin \
specFile=orders-model.zdl \
idType=integer \
idTypeFormat=int64 \
targetFile=src/main/resources/apis/asyncapi.yml

But before that, first you need to define in your ZDL Model:

  1. The APIs you are using, and whether you are the provider or the client of the API.
  2. Define a Service for your Domain Aggregate holding inbound commands and outbound events.
  3. Annotate with @asyncapi your inbound command documenting the API and the channel you will be listening to.
    • If you specify an api it will be used to determine if you are the provider (listening for a Command) or the client of the API (listening for third-party Events). No api means you are the provider.
  4. Annotate with @asyncapi your outbound events documenting the channel you will be publishing to.
  5. You can specify the topic for messages of APIs you are the provider. It will be added as the channel's address in your AsyncAPI definition (only for version 3).
// apis section at the beginning of your ZDL Model
apis {
asyncapi(provider) default {
uri "orders/src/main/resources/apis/asyncapi.yml"
}
asyncapi(client) RestaurantsAsyncAPI {
uri "restaurants/src/main/resources/apis/asyncapi.yml"
}
asyncapi(client) DeliveryAsyncAPI {
uri "delivery/src/main/resources/apis/asyncapi.yml"
}
}

Notice how we specify third-party events (1), async commands (2) and domain events (3):

service OrdersService for (CustomerOrder) {
// 1) we are listening to a third-party event, because we are a client of 'api: RestaurantsAsyncAPI'
@asyncapi({api: RestaurantsAsyncAPI, channel: "KitchenOrdersStatusChannel"})
updateKitchenStatus(id, KitchenStatusInput) CustomerOrder withEvents OrderStatusUpdated
// 2) we are listening for a command, because we didn't specify an api (or if the api specified was of type 'provider')
@asyncapi({channel: "CancelOrdersChannel", topic: "orders.cancel_orders"})
cancelOrder(id, CancelOrderInput) CustomerOrder withEvents OrderStatusUpdated
}
// 3) here we are producing events, informing of a fact that happened on our system
@asyncapi({channel: "OrderUpdatesChannel", topic: "orders.order_updates"})
event OrderStatusUpdated {}

From the above ZDL definition, ZenWave ZDLToAsyncAPI Plugin will generate operations just for those messages 'OrdersService' is the provider:

operations:
doCancelOrder:
action: receive
channel:
$ref: '#/channels/CancelOrdersChannel'
onOrderStatusUpdated:
action: send
channel:
$ref: '#/channels/OrderUpdatesChannel'

It will not generate operations for those messages OrdersService is acting as a client, like listening for third-party events like { api: RestaurantsAsyncAPI, channel: "KitchenOrdersStatusChannel" }.

API-First Code Generator from AsyncAPI

You can use API-First AsyncAPI Maven Plugin to generate models (DTOs) and a Listener implementation with error handling and other features.

ZenWave360 AsyncAPI Spring Cloud Streams

By default, ZenWave generates Spring Cloud Streams implementation that counts with different binders for virtually any message broker.

With this schema you just need to:

  • Provide an implementation for the IOnOrderEventConsumer interface generated.
  • Add OnOrderEventConsumer to your Spring Boot context and set any required configuration for spring.streams.bindings in application.yml.
πŸ“¦ target/generated-sources/zenwave
πŸ“¦ src/main/java
└─ <modelPackage> models (DTOs)
└─ OrderDTO.java
└─ <consumerPackage>
└─ IOnOrderEventConsumer (interface you need to implement)
└─ OnOrderEventConsumer (spring-cloud-streams consumer)

IOnOrderEventConsumer is just a business interface without dependencies on any framework or library and retaining the nomenclature of your AsyncAPI definition (and Domain):

@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin")
public interface IOnOrderEventConsumerService {
void onOrderEvent(OrderEvent payload, OrderEventHeaders headers);
}

Once you provide an implementation of this interface (in your Spring Boot context), you just need to set any required configuration for spring.streams.bindings in application.yml:

spring:
stream:
bindings:
do-cancel-order-in-0: ## you can find this name in OnOrderEventConsumer
destination: orders.cancel_orders
group: orders.consumer

You can consult how to configure API-First AsyncAPI Maven Plugin in Producing Domain Events section.

Just note, in regards the distinction from commands and events, provider and clients. You can configure as many plugin <execution/> as you need, either in role provider or client.

Generating Async Listeners and Tests

You can use AsyncAPI Spring Cloud Streams 3 Adapter Generator to generate Async Adapters (stubs) and Tests from AsyncAPI definitions with a command like this:

jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin --help
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin \
specFile=src/main/resources/apis/asyncapi.yml \
zdlFile=src/main/resources/model/orders-model.jdl \
role=provider \
style=imperative \
basePackage=io.zenwave360.example \
consumerApiPackage=io.zenwave360.example.adapters.events \
modelPackage=io.zenwave360.example.core.domain.events \
targetFolder=.

This will generate Command Listeners and a Test for each operation in your AsyncAPI definition. Depending on configured role: it will generate Command Listeners for subscribe/receive operations for role provider and publish/send operations for role client.

@Slf4j
@Service
public class DoCancelOrderConsumerService implements IDoCancelOrderConsumerService {
@Override
public void doCancelOrder(CancelOrderInput payload, CancelOrderInputHeaders headers) {
log.debug("Received CancelOrderInput: {}", payload);
// TODO: implement this functionality
}
}
/**
* Integration tests for {@link IOnOrderEventConsumerService}.
*/
public class OnOrderEventConsumerService extends BaseConsumerTest {
@Autowired
public IOnOrderEventConsumerService consumerService;
/**
* Test for onOrderEvent:
*/
@Test
public void onOrderEventTest() {
OrderEvent payload = new OrderEvent();
OrderEventHeaders headers = new OrderEventHeaders();
// invoke the method under test
consumerService.onOrderEvent(payload, headers);
// perform your assertions here
}
}