AsyncAPI and Spring Cloud Stream 3
👉 ZenWave360 Helps You Create Software Easy to Understand
Note: Starting with version 2.0.0, the Maven
groupId
has changed toio.zenwave360
. The code remains fully compatible.
- AsyncAPI and Spring Cloud Stream 3
Generating Consumer & Producer APIs
With ZenWave’s spring-cloud-streams3
and jsonschema2pojo
generator plugins you can generate:
- Strongly typed business interfaces
- Payload DTOs and
- Header objects from AsyncAPI definitions.
It uses Spring Cloud Streams as default implementation, so it can connect to many different brokers via provided binders.
And because everything is hidden behind interfaces we can encapsulate many Enterprise Integration Patterns:
- Transactional Outbox: with Spring Modulith, MongoDB ChangeStreams, Plain SQL or a custom solution
- Business DeadLetter Queues: allowing you to route different business Exceptions to different DeadLetter queues for non-retrayable errors.
- Enterprise Envelope: when your organization uses a common Envelope for messages, you can still express your AsyncAPI definition in terms of your business payload.
It supports AsyncAPI v2 (publish/subscribe) and AsyncAPI v3 (send/receive) styles.
It also lets you reverse how the API is generated using the client
role, so you don’t need to define a new API definition just to consume an existing API.
Maven Plugin Configuration (API-First)
Configure ZenWave Maven Plugin to generate code during your build process:
- Add generator dependencies to
zenwave-sdk-maven-plugin
- Configure
<execution>
blocks for each generator (jsonschema2pojo
,spring-cloud-streams3
, etc) - Reference AsyncAPI files from dependencies using
classpath:
prefix - Set generator options using
<configOptions>
(see SpringCloudStreams3Plugin Options )
Maven Plugin Base Configuration
Use this as base configuration:
<plugin>
<groupId>io.zenwave360.sdk</groupId>
<artifactId>zenwave-sdk-maven-plugin</artifactId>
<version>${zenwave.version}</version>
<configuration>
<addCompileSourceRoot>true</addCompileSourceRoot><!-- default is true -->
<addTestCompileSourceRoot>true</addTestCompileSourceRoot><!-- default is true -->
</configuration>
<executions>
<!-- Add executions for each generation here: -->
<execution>
<id>generate-asyncapi-xxx</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>classpath:model/asyncapi.yml</inputSpec>
<configOptions>
<!-- ... -->
</configOptions>
</configuration>
</execution>
</executions>
<!-- add any sdk plugin (custom or standard) as dependency here -->
<dependencies>
<dependency><!-- optional dependency containing AsyncAPI definition files -->
<groupId>com.example.apis</groupId>
<artifactId>asyncapis</artifactId>
<version>${apis.version}</version>
</dependency>
<dependency>
<groupId>io.zenwave360.sdk.plugins</groupId>
<artifactId>asyncapi-spring-cloud-streams3</artifactId>
<version>${zenwave.version}</version>
</dependency>
<dependency>
<groupId>io.zenwave360.sdk.plugins</groupId>
<artifactId>asyncapi-jsonschema2pojo</artifactId>
<version>${zenwave.version}</version>
</dependency>
</dependencies>
</plugin>
Generate Model DTOs using jsonschema2pojo
Add this execution to generate model DTOs from AsyncAPI definitions:
<execution>
<id>generate-asyncapi-producer-dtos</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>jsonschema2pojo</generatorName>
<inputSpec>${pom.basedir}/src/main/resources/model/asyncapi.yml</inputSpec>
<configOptions>
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
</configOptions>
</configuration>
</execution>
Generate Spring Cloud Streams Provider Implementation with spring-cloud-streams3
Add this execution to generate Spring Cloud Streams producer/consumer classes from AsyncAPI definitions:
<execution>
<id>generate-asyncapi-producer</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>classpath:model/asyncapi.yml</inputSpec>
<configOptions>
<role>provider</role><!-- use `client` to reverse code generation -->
<transactionalOutbox>none</transactionalOutbox> <!-- `modulith` (preferred), `mongodb`, `jdbc` or `none` -->
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
<apiPackage>io.zenwave360.example.api.events</apiPackage>
<!-- use <producerApiPackage></producerApiPackage> if you want to differentiate producer/consumer packages, it overrides apiPackage -->
<!-- use <consumerApiPackage></consumerApiPackage> if you want to differentiate producer/consumer packages, it overrides apiPackage -->
</configOptions>
</configuration>
</execution>
Reverse Code Generation using Client
role.
If you want to generate the consumer side of an existing API, you can use the client
role to reverse how the API is generated:
<execution>
<id>generate-asyncapi-client-imperative</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>spring-cloud-streams3</generatorName>
<inputSpec>${pom.basedir}/src/main/resources/model/asyncapi.yml</inputSpec>
<configOptions>
<role>client</role>
<modelPackage>io.zenwave360.example.api.events.model</modelPackage>
<apiPackage>io.zenwave360.example.api.events</apiPackage>
</configOptions>
</configuration>
</execution>
Because APIs mediated by a broker are inherently reciprocal it’s difficult to establish the roles of client/server: what represents a publish
operation from one side will be a subscribe
operation seen from the other side. Also, a given service can act as a publisher and subscriber on the same API.
For these reasons, to avoid defining the same API operations multiple times from each perspective, we propose to define de API only once from the perspective of the provider of the functionality, which may be a producer, a consumer or both.
Some definitions:
- SERVICE: An independent piece of software, typically a microservice, that provides a set of capabilities to other services.
- PROVIDER: The service that implements the functionality of the API. It may be accepting asynchronous command request or publishing business domain events.
- CLIENT/s: The service/s that makes use of the functionality of the API. It may be requesting asynchronous commands or subscribing to business domain events.
- PRODUCER: A service that writes a given message.
- CONSUMER: A service that reads a given message.
Use the table to understand which section of AsyncAPI (publish or subscribe) to use for each topic, and which role (provider or client) to use on the plugin configuration.
Events | Commands | |
---|---|---|
Provider | Produces (publish) | Consumes (subscribe) |
Client | Consumes (subscribe) | Produces (publish) |
OperationId Suggested Prefix | on<Event Name> | do<Command Name> |
Getting Help
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin --help
SpringCloudStreams3Plugin Options
Option | Description | Type | Default | Values |
---|---|---|---|---|
apiFile | API Specification File | URI | ||
apiFiles | API Spec files to parse (comma separated) | List | ||
role | Project role: provider/client | AsyncapiRoleType | provider | provider, client |
style | Programming style | ProgrammingStyle | imperative | imperative, reactive |
modelPackage | Java Models package name | String | ||
producerApiPackage | Java API package name for outbound (producer) services. It can override apiPackage for producers. | String | ||
consumerApiPackage | Java API package name for inbound (consumer) services. It can override apiPackage for consumer. | String | ||
apiPackage | Java API package name for producerApiPackage and consumerApiPackage if not specified. | String | ||
transactionalOutbox | Transactional outbox type for message producers. | TransactionalOutboxType | none | none, modulith, mongodb, jdbc |
bindingPrefix | SC Streams Binding Name Prefix (used in @Component name) | String | ||
bindingSuffix | Spring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. | String | -0 | |
generatedAnnotationClass | Annotation class to mark generated code (e.g. org.springframework.aot.generate.Generated ). When retained at runtime, this prevents code coverage tools like Jacoco from including generated classes in coverage reports. | String | ||
targetFolder | Target folder to generate code to. If left empty, it will print to stdout. | File | ||
modelNamePrefix | Sets the prefix for model classes and enums | String | ||
modelNameSuffix | Sets the suffix for model classes and enums | String | ||
runtimeHeadersProperty | AsyncAPI extension property name for runtime auto-configuration of headers. | String | x-runtime-expression | |
includeApplicationEventListener | Include ApplicationEvent listener for consuming messages within the modulith. | boolean | false | |
skipProducerImplementation | Generate only the producer interface and skip the implementation. | boolean | false | |
exposeMessage | Whether to expose underlying spring Message to consumers or not. | boolean | false | |
useEnterpriseEnvelope | Include support for enterprise envelop wrapping/unwrapping. | boolean | false | |
envelopeJavaTypeExtensionName | AsyncAPI Message extension name for the envelop java type for wrapping/unwrapping. | String | x-envelope-java-type | |
methodAndMessageSeparator | To avoid method erasure conflicts, when exposeMessage or reactive style this character will be used as separator to append message payload type to method names in consumer interfaces. | String | $ | |
consumerPrefix | SC Streams Binder class prefix | String | ||
consumerSuffix | SC Streams Binder class suffix | String | Consumer | |
consumerServicePrefix | Business/Service interface prefix | String | I | |
consumerServiceSuffix | Business/Service interface suffix | String | ConsumerService | |
includeKafkaCommonHeaders | Include Kafka common headers ‘kafka_messageKey’ as x-runtime-header | boolean | false | |
bindingTypes | Binding names to include in code generation. Generates code for ALL bindings if left empty | List | ||
operationIds | Operation ids to include in code generation. Generates code for ALL if left empty | List | [] | |
excludeOperationIds | Operation ids to exclude in code generation. Skips code generation if is not included or is excluded. | List | [] | |
formatter | Code formatter implementation | Formatters | palantir | palantir, spring, google |
skipFormatting | Skip java sources output formatting | boolean | false | |
haltOnFailFormatting | Halt on formatting errors | boolean | true |
Advanced Features
Transactional Outbox Pattern
ZenWave SDK supports sending messages transactionaly using the Transactional Outbox Pattern.
See Implementing a Transactional OutBox With AsyncAPI, SpringModulith and ZenWaveSDK for complete details.
Populating Headers at Runtime Automatically
ZenWave SDK provides x-runtime-expression
for automatic header population at runtime. Values for this extension property are:
$message.payload#/<json pointer fragment>
: follows the same format as AsyncAPI Correlation ID object.$message.payload#{ <SpEL expression> }
: will use the SpEL expression to populate the header value.$supplierBeanName
: will use a bean namedsupplierBeanName
(you can use any other name) of typejava.function.Supplier
configured in your Spring context.
CustomerEventMessage:
name: CustomerEventMessage
// [...] other properties omitted for brevity
headers:
type: object
properties:
kafka_messageKey:
type: string
description: This one will be populated automatically at runtime
x-runtime-expression: $message.payload#/customer/id
tracingId:
type: string
description: This one will be populated automatically at runtime
x-runtime-expression: $supplierBeanName
# CloudEvents Attributes:
# these examples showcase how you can use SpEL expressions to populate runtime headers
ce-id:
type: string
description: Unique identifier for the event
x-runtime-expression: $message.payload#{#this.id}
ce-source:
type: string
description: URI identifying the context where event happened
x-runtime-expression: $message.payload#{"CustomersService"}
ce-specversion:
type: string
description: CloudEvents specification version
x-runtime-expression: $message.payload#{"1.0"}
ce-type:
type: string
description: Event type
x-runtime-expression: $message.payload#{#this.getClass().getSimpleName()}
ce-time:
type: string
description: Timestamp of when the event happened
x-runtime-expression: $message.payload#{T(java.time.Instant).now().toString()}
You can also override the runtimeHeadersProperty
extension property name (in the rare case you need to):
<configOption>
<runtimeHeadersProperty>x-custom-runtime-expression</runtimeHeadersProperty><!-- you can also override this extension property name -->
</configOption>
And provide a bean of type java.function.Supplier
in your Spring context:
@Bean("supplierBeanName")
public Supplier supplierBeanName() {
return () -> "some-value";
}
InMemory Events Producer for Tests (Mocks)
// autogenerate in: target/generated-sources/zenwave/src/test/java/.../InMemoryCustomerOrderEventsProducer.java
public class InMemoryCustomerOrderEventsProducer implements ICustomerOrderEventsProducer {
protected Map<String, List<Message>> capturedMessages = new HashMap<>();
public Map<String, List<Message>> getCapturedMessages() {
return capturedMessages;
}
// other details omitted for brevity
/**
* CustomerOrder Domain Events
*/
public boolean onCustomerOrderEvent(CustomerOrderEventPayload payload, CustomerOrderEventPayloadHeaders headers) {
log.debug("Capturing message to topic: {}", onCustomerOrderEventBindingName);
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
return appendCapturedMessage(onCustomerOrderEventBindingName, message);
}
}
// autogenerated in: target/generated-sources/zenwave/src/test/java/.../ProducerInMemoryContext.java
public class ProducerInMemoryContext {
public static final ProducerInMemoryContext INSTANCE = new ProducerInMemoryContext();
private CustomerEventsProducerCaptor customerEventsProducerCaptor = new CustomerEventsProducerCaptor();
public <T extends ICustomerEventsProducer> T customerEventsProducer() {
return (T) customerEventsProducerCaptor;
}
}
Routing Business Exceptions to Dead Letter Queues with Configuration
When consuming Events you can route different business exceptions to different Dead Letter Queues bindings using the dead-letter-queue-error-map
.
This mechanism is useful when you know an exception is not retrayable, and you want to route it to a different DLQ.
When no matching exception is found in dead-letter-queue-error-map
, the exception will propagate up the call stack, allowing standard retry and error handling mechanisms to take effect.
# application.yml
spring:
cloud:
stream:
bindings:
do-create-customer-in-0:
destination: customer.requests
content-type: application/json
dead-letter-queue-error-map: >
{
'jakarta.validation.ValidationException': 'do-create-customer-validation-error-out-0',
'java.lang.Exception': 'do-create-customer-error-out-0'
}
Generating Consumer Adapters (Skeletons)
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin --help
jbang zw -p io.zenwave360.sdk.plugins.SpringCloudStreams3AdaptersPlugin \
specFile=src/main/resources/model/asyncapi.yml \
zdlFile=src/main/resources/model/orders-model.jdl \
role=provider \
style=imperative \
basePackage=io.zenwave360.example \
consumerApiPackage=io.zenwave360.example.adapters.events \
modelPackage=io.zenwave360.example.core.domain.events \
targetFolder=.
@Component
public class DoCustomerRequestConsumerServiceAdapter implements IDoCustomerRequestConsumerService {
private EventEntityMapper mapper;
// TODO: private EntityUseCases service;
@Autowired
public void setEventEntityMapper(EventEntityMapper mapper) {
this.mapper = mapper;
}
/** Customer Async Requests */
public void doCustomerRequest(CustomerRequestPayload payload, CustomerRequestPayloadHeaders headers) {
// TODO: service.doCustomerRequest(mapper.asEntity(payload));
};
}
Consumer Adapters API Tests
// generated and editable in: src/test/java/.../adapters/events/DoCustomerRequestConsumerServiceIT.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@org.springframework.transaction.annotation.Transactional
public class DoCustomerRequestConsumerServiceIT extends BaseConsumerTest {
@Autowired public IDoCustomerRequestConsumerService consumerService;
/** Test for doCustomerRequest: */
@Test
public void doCustomerRequestTest() {
CustomerRequestPayload payload = new CustomerRequestPayload();
payload.setCustomerId(null);
payload.setRequestType(null);
payload.setCustomer(null);
CustomerRequestPayloadHeaders headers = new CustomerRequestPayloadHeaders();
// invoke the method under test
consumerService.doCustomerRequest(payload, headers);
// perform your assertions here
}
}
Options for Consumer Adapters
Option | Description | Type | Default | Values |
---|---|---|---|---|
specFile | API Specification File | URI | ||
specFiles | ZDL files to parse | String[] | [] | |
targetFolder | Target folder to generate code to. If left empty, it will print to stdout. | File | ||
style | Programming style | ProgrammingStyle | imperative | imperative, reactive |
role | Project role: provider/client | AsyncapiRoleType | provider | provider, client |
exposeMessage | Whether to expose underlying spring Message to consumers or not. | boolean | false | |
apiId | Unique identifier of each AsyncAPI that you consume as a client or provider. It will become the last package token for generated adapters | String | commands | |
basePackage | Applications base package | String | ||
adaptersPackage | The package to generate Async Inbound Adapters in | String | .adapters.events. | |
inboundDtosPackage | Package where your inbound dtos are | String | .core.inbound.dtos | |
servicesPackage | Package where your domain services/usecases interfaces are | String | .core.inbound | |
apiPackage | Java API package name for producerApiPackage and consumerApiPackage if not specified. | String | ||
modelPackage | Java Models package name | String | ||
producerApiPackage | Java API package name for outbound (producer) services. It can override apiPackage for producers. | String | ||
consumerApiPackage | Java API package name for inbound (consumer) services. It can override apiPackage for consumer. | String | ||
bindingTypes | Binding names to include in code generation. Generates code for ALL bindings if left empty | List | ||
operationIds | Operation ids to include in code generation. Generates code for ALL if left empty | List | [] | |
runtimeHeadersProperty | AsyncAPI extension property name for runtime auto-configuration of headers. | String | x-runtime-expression | |
continueOnZdlError | Continue even when ZDL contains fatal errors | boolean | true | |
inputDTOSuffix | Should use same value configured in BackendApplicationDefaultPlugin. Whether to use an input DTO for entities used as command parameter. | String | ||
baseTestClassName | BaseConsumerTest class name | String | BaseConsumerTest | |
baseTestClassPackage | BaseConsumerTest package | String | .adapters.events | |
transactional | Annotate tests as @Transactional | boolean | true | |
transactionalAnnotationClass | @Transactional annotation class name | String | org.springframework.transaction.annotation.Transactional | |
methodAndMessageSeparator | To avoid method erasure conflicts, when exposeMessage or reactive style this character will be used as separator to append message payload type to method names in consumer interfaces. | String | $ | |
consumerPrefix | SC Streams Binder class prefix | String | ||
consumerSuffix | SC Streams Binder class suffix | String | Consumer | |
bindingPrefix | SC Streams Binding Name Prefix (used in @Component name) | String | ||
servicePrefix | Business/Service interface prefix | String | I | |
serviceSuffix | Business/Service interface suffix | String | ConsumerService | |
bindingSuffix | Spring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. | String | -0 | |
formatter | Code formatter implementation | Formatters | spring | google, palantir, spring, eclipse |
skipFormatting | Skip java sources output formatting | boolean | false | |
haltOnFailFormatting | Halt on formatting errors | boolean | true |