12.1.4. 资源服务器访问其他资源服务器
系统示意图
在 Azure 中创建所需的资源
阅读有关向 Microsoft 标识平台注册应用程序的 MS 文档。
创建应用注册。得到,和。AZURE_TENANT_ID
AZURE_CLIENT_ID
AZURE_CLIENT_SECRET
添加所需的依赖项
<dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-active-directory</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-oauth2-resource-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-oauth2-client</artifactId> </dependency> </dependencies>
添加必需属性
spring: cloud: azure: active-directory: profile: tenant-id: ${AZURE_TENANT_ID} credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} authorization-clients: graph: scopes: - https://graph.microsoft.com/User.Read
在应用程序中使用 OAuth2AuthorizedClient
public class SampleController { @GetMapping("call-graph") public String callGraph(@RegisteredOAuth2AuthorizedClient("graph") OAuth2AuthorizedClient graph) { return callMicrosoftGraphMeEndpoint(graph); } }
样品
示例项目:aad-resource-server-obo.
12.1.5. 一个应用程序中的 Web 应用程序和资源服务器
在 Azure 中创建所需的资源
阅读有关向 Microsoft 标识平台注册应用程序的 MS 文档。
创建应用注册。得到,和。AZURE_TENANT_ID
AZURE_CLIENT_ID
AZURE_CLIENT_SECRET
添加所需的依赖项
<dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-active-directory</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-oauth2-resource-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-oauth2-client</artifactId> </dependency> </dependencies>
添加必需属性
将属性设置为,并指定每个授权客户端的授权类型。spring.cloud.azure.active-directory.application-typeweb_application_and_resource_server
spring: cloud: azure: active-directory: profile: tenant-id: ${AZURE_TENANT_ID} credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} app-id-uri: ${WEB_API_ID_URI} application-type: web_application_and_resource_server # This is required. authorization-clients: graph: authorizationGrantType: authorization_code # This is required. scopes: - https://graph.microsoft.com/User.Read - https://graph.microsoft.com/Directory.Read.All
定义安全性配置适配器
配置多个 HttpSecurity 实例,包含资源服务器和 Web 应用程序两种安全配置。AadWebApplicationAndResourceServerConfig
@EnableWebSecurity @EnableGlobalMethodSecurity(prePostEnabled = true) public class AadWebApplicationAndResourceServerConfig { @Order(1) @Configuration public static class ApiWebSecurityConfigurationAdapter extends AadResourceServerWebSecurityConfigurerAdapter { protected void configure(HttpSecurity http) throws Exception { super.configure(http); // All the paths that match `/api/**`(configurable) work as `Resource Server`, other paths work as `Web application`. http.antMatcher("/api/**") .authorizeRequests().anyRequest().authenticated(); } } @Configuration public static class HtmlWebSecurityConfigurerAdapter extends AadWebSecurityConfigurerAdapter { @Override protected void configure(HttpSecurity http) throws Exception { super.configure(http); // @formatter:off http.authorizeRequests() .antMatchers("/login").permitAll() .anyRequest().authenticated(); // @formatter:on } } }
12.1.6. 配置
Table 17. Configurable properties of spring-cloud-azure-starter-active-directory
名字
违约
描述
spring.cloud.azure.active-directory .app-id-uri
可能在id_token的“aud”声明中使用的应用 ID URI。
spring.cloud.azure.active-directory.application-type
AAD 应用程序的类型。
spring.cloud.azure.active-directory.authenticate-additional-parameters
向授权 URL 添加其他参数。
spring.cloud.azure.active-directory.authorization-clients
OAuth2 授权客户端。
spring.cloud.azure.active-directory.credential.client-id
使用 Azure 执行服务主体身份验证时要使用的客户端 ID。
spring.cloud.azure.active-directory.credential.client-secret
使用 Azure 执行服务主体身份验证时要使用的客户端密码。
spring.cloud.azure.active-directory.credential.client-certificate-path
使用 Azure 执行服务主体身份验证时要使用的客户端密码。
spring.cloud.azure.active-directory.credential.client-certificate-password
证书文件的密码。
spring.cloud.azure.active-directory.jwk-set-cache-lifespan
5
缓存的 JWK 集在过期之前的生存期,默认值为 5 分钟。
spring.cloud.azure.active-directory.jwk-set-cache-refresh-time
5
缓存的 JWK 集在过期之前的刷新时间,默认值为 5 分钟。
spring.cloud.azure.active-directory.jwt-connect-timeout
JWKSet 远程 URL 调用的连接超时。
spring.cloud.azure.active-directory.jwt-read-timeout
读取 JWKSet 远程 URL 调用的超时。
spring.cloud.azure.active-directory.jwt-size-limit
JWKSet 远程 URL 调用的大小限制(以字节为单位)。
spring.cloud.azure.active-directory.post-logout-redirect-uri
注销后的重定向 URI。
spring.cloud.azure.active-directory.profile.cloud-type
要连接到的 Azure 云的名称。支持的类型包括:AZURE、AZURE_CHINA、AZURE_GERMANY、AZURE_US_GOVERNMENT、其他。
spring.cloud.azure.active-directory.profile.environment
Azure Active Directory 终结点的属性。
spring.cloud.azure.active-directory.profile.tenant-id
Azure 租户 ID。
spring.cloud.azure.active-directory.redirect-uri-template
{baseUrl}/login/oauth2/code/
重定向终结点:由授权服务器用于通过资源所有者用户代理将包含授权凭据的响应返回给客户端。
spring.cloud.azure.active-directory.resource-server.claim-to-authority-prefix-map
配置将用于生成授予权限的声明,以及授予权限的字符串值的前缀。默认值为:“scp”→“SCOPE_”、“角色”→“APPROLE_”。
spring.cloud.azure.active-directory.resource-server.principal-claim-name
配置在身份验证主体#getName 中返回的访问令牌中的声明。默认值为“sub”。
spring.cloud.azure.active-directory.session-stateless
false
如果为 true,则激活无状态身份验证筛选器 AadAppRoleStatelessAuthenticationFilter。默认值为 false,这将激活 AadAuthenticationFilter。
spring.cloud.azure.active-directory.user-group.allowed-group-ids
组 ID 可用于构造授予权限。
spring.cloud.azure.active-directory.user-group.allowed-group-names
组名可用于构造授予权限。
spring.cloud.azure.active-directory.user-group.use-transitive-members
false
如果为“true”,则使用“v1.0/me/transitiveMemberOf”获取成员。否则,请使用“v1.0/me/memberOf”。
spring.cloud.azure.active-directory.user-name-attribute
确定哪个声明是主体的名称。
下面是有关如何使用这些属性的一些示例:
应用程序类型
应用程序类型可以从依赖项中推断出来:spring-security-oauth2-client 或 spring-security-oauth2-resource-server。如果推断值不是所需的值,则可以指定应用程序类型。下表是关于有效值和推断值的表:
Table 18. Application type of spring-cloud-azure-starter-active-directory
具有依赖关系:spring-security-oauth2-client
具有依赖关系:spring-security-oauth2-resource-server
应用程序类型的有效值
推断值
是的
不
web_application
web_application
不
是的
resource_server
resource_server
是的
是的
web_application
, , , resource_server
resource_server_with_obo
web_application_and_resource_server
resource_server_with_obo
12.2. Spring Security with Azure Active Directory B2C
Azure Active Directory (Azure AD) B2C 是一项标识管理服务,使你能够自定义和控制客户在使用应用程序时注册、登录和管理其配置文件的方式。Azure AD B2C 启用这些操作,同时保护客户的标识。
12.2.1. 依赖设置
<dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-active-directory-b2c</artifactId> </dependency> </dependencies>
12.2.2. 配置
Table 19. Configurable properties of spring-cloud-azure-starter-active-directory-b2c
名字
违约
描述
spring.cloud.azure.active-directory.b2c.app-id-uri
应用ID URI,可用于令牌的“aud”声明。
spring.cloud.azure.active-directory.b2c.authenticate-additional-parameters
用于身份验证的其他参数。
spring.cloud.azure.active-directory.b2c.authorization-clients
指定客户端配置。
spring.cloud.azure.active-directory.b2c.base-uri
AAD B2C 终结点基 URI。
spring.cloud.azure.active-directory.b2c.credential
AAD B2C 凭据信息。
spring.cloud.azure.active-directory.b2c.jwt-connect-timeout
JWKSet 远程 URL 调用的连接超时。
spring.cloud.azure.active-directory.b2c.jwt-read-timeout
读取 JWKSet 远程 URL 调用的超时。
spring.cloud.azure.active-directory.b2c.jwt-size-limit
JWKSet 远程 URL 调用的大小限制(以字节为单位)。
spring.cloud.azure.active-directory.b2c.login-flow
sign-up-or-sign-in
指定主登录流密钥。
spring.cloud.azure.active-directory.b2c.logout-success-url
localhost:8080/login
注销后重定向网址。
spring.cloud.azure.active-directory.b2c.profile
AAD B2C 配置文件信息。
spring.cloud.azure.active-directory.b2c.reply-url
{baseUrl}/login/oauth2/code/
获取授权码后回复网址。
spring.cloud.azure.active-directory.b2c.user-flows
用户流。
spring.cloud.azure.active-directory.b2c.user-name-attribute-name
用户名属性名称。
有关完整配置,请查看附录页面。
12.2.3. 基本用法
A是允许用户使用 Azure AD 登录的任何基于 Web 的应用程序,而在验证从 Azure AD 获取access_token后,将接受或拒绝访问。我们将在本指南中介绍 4 种方案:web application
resource server
访问 Web 应用程序。
访问资源服务器的 Web 应用程序。
访问资源服务器。
访问其他资源服务器的资源服务器。
用法 1:访问 Web 应用程序
此方案使用OAuth 2.0 授权代码授予流使用 Azure AD B2C 用户登录用户。
步骤 1:从门户菜单中选择“Azure AD B2C ”,单击“应用程序 ”,然后单击“添加 ”。
第 2 步:指定您的应用程序名称 ,我们称之为,添加回复 URL ,将应用程序 ID 记录为您的,然后单击保存 。webapp
localhost:8080/login/oauth2/code/
WEB_APP_AZURE_CLIENT_ID
步骤 3:从应用程序中选择“密钥”,单击“生成要生成的密钥 ”,然后单击“保存 ”。WEB_APP_AZURE_CLIENT_SECRET
步骤 4:选择 左侧的“用户流”,然后单击“新建用户流 ”。
步骤 5:选择“注册或登录 ”、“配置文件编辑 ”和“密码重置 ”以创建用户流 分别。指定用户流名称和用户属性和声明 ,单击“创建 ”。
步骤6:选择API权限 >添加权限 >微软API ,选择微软图 , 选择“委派权限 ”,选中“offline_access 和openid 权限”,选择“添加权限 ”以完成该过程。
步骤 7:授予对Graph 权限的管理员同意。
<dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>azure-spring-boot-starter-active-directory-b2c</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.thymeleaf.extras</groupId> <artifactId>thymeleaf-extras-springsecurity5</artifactId> </dependency> </dependencies>
步骤 9:使用之前创建的值在application.yml 中添加属性,例如:
spring: cloud: azure: active-directory: b2c: authenticate-additional-parameters: domain_hint: xxxxxxxxx # optional login_hint: xxxxxxxxx # optional prompt: [login,none,consent] # optional base-uri: ${BASE_URI} credential: client-id: ${WEBAPP_AZURE_CLIENT_ID} client-secret: ${WEBAPP_AZURE_CLIENT_SECRET} login-flow: ${LOGIN_USER_FLOW_KEY} # default to sign-up-or-sign-in, will look up the user-flows map with provided key. logout-success-url: ${LOGOUT_SUCCESS_URL} user-flows: ${YOUR_USER_FLOW_KEY}: ${USER_FLOW_NAME} user-name-attribute-name: ${USER_NAME_ATTRIBUTE_NAME}
控制器代码可以参考以下内容:
@Controller public class WebController { private void initializeModel(Model model, OAuth2AuthenticationToken token) { if (token != null) { final OAuth2User user = token.getPrincipal(); model.addAllAttributes(user.getAttributes()); model.addAttribute("grant_type", user.getAuthorities()); model.addAttribute("name", user.getName()); } } @GetMapping(value = { "/", "/home" }) public String index(Model model, OAuth2AuthenticationToken token) { initializeModel(model, token); return "home"; } }
安全配置代码可以参考以下内容:
@EnableWebSecurity public class WebSecurityConfiguration extends WebSecurityConfigurerAdapter { private final AadB2cOidcLoginConfigurer configurer; public WebSecurityConfiguration(AadB2cOidcLoginConfigurer configurer) { this.configurer == configurer; } @Override protected void configure(HttpSecurity http) throws Exception { // @formatter:off http.authorizeRequests() .anyRequest().authenticated() .and() .apply(configurer); // @formatter:off } }
从aad-b2c-Web 应用程序示例中复制home.html ,并将 and 分别替换为前面完成的用户流名称。PROFILE_EDIT_USER_FLOW
PASSWORD_RESET_USER_FLOW
让我们在端口8080 上运行。Webapp
由 Maven 构建并启动应用程序后,在 Web 浏览器中打开;您应该被重定向到登录页面。localhost:8080/
单击带有登录用户流的链接,应重定向 Azure AD B2C 以启动身份验证过程。
成功登录后,您应该会看到浏览器中的示例。home page
用法 2:Web 应用程序访问资源服务器
此方案基于访问 Web 应用程序方案以允许应用程序访问其他资源,即 [OAuth 2.0 客户端凭据授予] 流。
步骤 1:从门户菜单中选择“Azure AD B2C ”,单击“应用程序 ”,然后单击“添加 ”。
第 2 步:指定您的应用程序名称 ,我们称之为,将应用程序 ID 记录为您的,然后单击保存 。webApiA
WEB_API_A_AZURE_CLIENT_ID
步骤 3:从应用程序中选择“密钥”,单击“生成要生成的密钥 ”,然后单击“保存 ”。WEB_API_A_AZURE_CLIENT_SECRET
步骤 4:选择左侧的“公开 API ”,然后单击“设置 ”链接, 将“应用程序 ID URI ”记录为“你的”,然后“保存 ”。WEB_API_A_APP_ID_URL
步骤 5:选择左侧的“清单 ”,然后将以下 json 段粘贴到阵列中, 将“应用程序 ID URI ”记录为“你的”,将应用角色的值记录为“你的”,然后保存 。appRoles
WEB_API_A_APP_ID_URL
WEB_API_A_ROLE_VALUE
{ "allowedMemberTypes": [ "Application" ], "description": "WebApiA.SampleScope", "displayName": "WebApiA.SampleScope", "id": "04989db0-3efe-4db6-b716-ae378517d2b7", "isEnabled": true, "value": "WebApiA.SampleScope" }
步骤6:选择API权限 >添加权限 >我的API ,选择WebApiA 应用程序名称,选择应用程序权限 ,选择WebApiA.SampleScope 权限,选择添加权限 完成该过程。
步骤 7:授予管理员同意WebApiA 权限。
步骤8:在访问Web应用 场景的基础上添加以下依赖关系。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
步骤九:在访问Web应用 场景的基础上添加以下配置。
spring: cloud: azure: active-directory: b2c: base-uri: ${BASE_URI} # Such as: https://xxxxb2c.b2clogin.com profile: tenant-id: ${AZURE_TENANT_ID} authorization-clients: ${RESOURCE_SERVER_A_NAME}: authorization-grant-type: client_credentials scopes: ${WEB_API_A_APP_ID_URL}/.default
第 10 步:编写您的 Java 代码。Webapp
控制器代码可以参考以下内容:
class Demo { /** * Access to protected data from Webapp to WebApiA through client credential flow. The access token is obtained by webclient, or * <p>@RegisteredOAuth2AuthorizedClient("webApiA")</p>. In the end, these two approaches will be executed to * DefaultOAuth2AuthorizedClientManager#authorize method, get the access token. * * @return Respond to protected data from WebApi A. */ @GetMapping("/webapp/webApiA") public String callWebApiA() { String body = webClient .get() .uri(LOCAL_WEB_API_A_SAMPLE_ENDPOINT) .attributes(clientRegistrationId("webApiA")) .retrieve() .bodyToMono(String.class) .block(); LOGGER.info("Call callWebApiA(), request '/webApiA/sample' returned: {}", body); return "Request '/webApiA/sample'(WebApi A) returned a " + (body != null ? "success." : "failure."); } }
安全配置代码与访问 Web 应用程序 方案相同,添加了另一个 beanis,如下所示:webClient
public class SampleConfiguration { @Bean public WebClient webClient(OAuth2AuthorizedClientManager oAuth2AuthorizedClientManager) { ServletOAuth2AuthorizedClientExchangeFilterFunction function = new ServletOAuth2AuthorizedClientExchangeFilterFunction(oAuth2AuthorizedClientManager); return WebClient.builder() .apply(function.oauth2Configuration()) .build(); } }
第 11 步:请参阅访问资源服务器 部分以编写您的 Java 代码。WebApiA
步骤 12:生成和测试应用
Letand分别在端口 8080 和8081 上运行。 启动并应用,登录成功后返回首页,即可访问获取WebApiA 资源响应。Webapp
WebApiA
Webapp
WebApiA
localhost:8080/webapp/webApiA
用法 3:访问资源服务器
此方案不支持登录。只需通过验证访问令牌来保护服务器,如果有效,则为请求提供服务。
步骤 1:请参阅用法 2:Web 应用程序访问资源服务器以构建您的权限。WebApiA
步骤 2:为 Web 应用程序添加权限并授予管理员同意。WebApiA
第 3 步:在pom .xml 中添加以下依赖项。
<dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>azure-spring-boot-starter-active-directory-b2c</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
spring: cloud: azure: active-directory: b2c: base-uri: ${BASE_URI} # Such as: https://xxxxb2c.b2clogin.com profile: tenant-id: ${AZURE_TENANT_ID} app-id-uri: ${APP_ID_URI} # If you are using v1.0 token, please configure app-id-uri for `aud` verification credential: client-id: ${AZURE_CLIENT_ID} # If you are using v2.0 token, please configure client-id for `aud` verification
控制器代码可以参考以下内容:
class Demo { /** * webApiA resource api for web app * @return test content */ @PreAuthorize("hasAuthority('APPROLE_WebApiA.SampleScope')") @GetMapping("/webApiA/sample") public String webApiASample() { LOGGER.info("Call webApiASample()"); return "Request '/webApiA/sample'(WebApi A) returned successfully."; } }
安全配置代码可以参考以下内容:
@EnableWebSecurity @EnableGlobalMethodSecurity(prePostEnabled = true) public class ResourceServerConfiguration extends WebSecurityConfigurerAdapter { @Override protected void configure(HttpSecurity http) throws Exception { http.authorizeRequests((requests) -> requests.anyRequest().authenticated()) .oauth2ResourceServer() .jwt() .jwtAuthenticationConverter(new AadJwtBearerTokenAuthenticationConverter()); } }
在端口8081 上运行。 获取资源和访问的访问令牌作为持有者授权标头。WebApiA
webApiA
localhost:8081/webApiA/sample
用法 4:资源服务器访问其他资源服务器
此方案是访问资源服务器的 升级,支持基于 OAuth2 客户端凭据流访问其他应用程序资源。
步骤1:参考前面的步骤,我们创建一个应用程序并公开应用程序权限。WebApiB
WebApiB.SampleScope
{ "allowedMemberTypes": [ "Application" ], "description": "WebApiB.SampleScope", "displayName": "WebApiB.SampleScope", "id": "04989db0-3efe-4db6-b716-ae378517d2b7", "isEnabled": true, "lang": null, "origin": "Application", "value": "WebApiB.SampleScope" }
步骤 2:授予管理员同意WebApiB权限。
第 3 步:在访问资源服务器 的基础上,在pom.xml 中添加依赖项。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
步骤四:在访问资源服务器 场景配置的基础上添加如下配置。
spring: cloud: azure: active-directory: b2c: credential: client-secret: ${WEB_API_A_AZURE_CLIENT_SECRET} authorization-clients: ${RESOURCE_SERVER_B_NAME}: authorization-grant-type: client_credentials scopes: ${WEB_API_B_APP_ID_URL}/.default
WebApiA 控制器代码可以参考以下内容:
public class SampleController { /** * Access to protected data from WebApiA to WebApiB through client credential flow. The access token is obtained by webclient, or * <p>@RegisteredOAuth2AuthorizedClient("webApiA")</p>. In the end, these two approaches will be executed to * DefaultOAuth2AuthorizedClientManager#authorize method, get the access token. * * @return Respond to protected data from WebApi B. */ @GetMapping("/webApiA/webApiB/sample") @PreAuthorize("hasAuthority('APPROLE_WebApiA.SampleScope')") public String callWebApiB() { String body = webClient .get() .uri(LOCAL_WEB_API_B_SAMPLE_ENDPOINT) .attributes(clientRegistrationId("webApiB")) .retrieve() .bodyToMono(String.class) .block(); LOGGER.info("Call callWebApiB(), request '/webApiB/sample' returned: {}", body); return "Request 'webApiA/webApiB/sample'(WebApi A) returned a " + (body != null ? "success." : "failure."); } }
WebApiB 控制器代码可以参考以下内容:
public class SampleController { /** * webApiB resource api for other web application * @return test content */ @PreAuthorize("hasAuthority('APPROLE_WebApiB.SampleScope')") @GetMapping("/webApiB/sample") public String webApiBSample() { LOGGER.info("Call webApiBSample()"); return "Request '/webApiB/sample'(WebApi B) returned successfully."; } }
安全配置代码与访问资源服务器 方案相同,添加了另一个 beanis 如下webClient
public class SampleConfiguration { @Bean public WebClient webClient(OAuth2AuthorizedClientManager oAuth2AuthorizedClientManager) { ServletOAuth2AuthorizedClientExchangeFilterFunction function = new ServletOAuth2AuthorizedClientExchangeFilterFunction(oAuth2AuthorizedClientManager); return WebClient.builder() .apply(function.oauth2Configuration()) .build(); } }
Letand分别在端口 8081 和8082 上运行。 启动和应用程序,获取资源和访问的访问令牌作为持有者授权标头。WebApiA
WebApiB
WebApiA
WebApiB
webApiA
localhost:8081/webApiA/webApiB/sample
12.2.4. 示例
有关更多详细信息,请参阅spring-cloud-azure-starter-active-directory-b2c 示例。
13. 弹簧集成支持
Spring Integration Extension for Azure 为Azure SDK for Java 提供的各种服务提供 Spring Integration 适配器。我们为以下 Azure 服务提供 Spring 集成支持:事件中心、服务总线、存储队列。以下是支持的适配器列表:
春季-云-Azure-启动器-集成-事件中心
弹簧-云-Azure-启动器-集成-服务总线
春天-云-Azure-启动器-集成-存储-队列
13.1. 春季与 Azure 事件中心的集成
13.1.1. 关键概念
Azure 事件中心是一个大数据流式处理平台和事件引入服务。它每秒可以接收和处理数百万个事件。可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。
Spring 集成支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明式适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。事件中心的春季集成 扩展项目为 Azure 事件中心提供入站和出站通道适配器和网关。
RxJava 支持 API 从版本 4.0.0 中删除。 有关详细信息,请参阅 Javadoc。
消费群体
事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。虽然卡夫卡 将所有已提交的偏移量存储在代理中,必须存储事件中心消息的偏移量 正在手动处理。事件中心 SDK 提供了在 Azure 存储中存储此类偏移的函数。
分区支持
事件中心提供与 Kafka 类似的物理分区概念。但与 Kafka 在使用者和分区之间的自动重新平衡不同,事件中心提供了一种抢占模式。存储帐户充当租约,以确定哪个分区归哪个使用者所有。当新的消费者启动时,它会尝试窃取一些分区 从大多数重载消费者实现工作负载均衡。
若要指定负载平衡策略,开发人员可以使用该配置。有关如何配置的示例,请参阅以下部分。EventHubsContainerProperties
EventHubsContainerProperties
批量消费者支持
支持批量消费模式。要启用它,用户可以在构造实例时指定侦听器模式。 启用后,将 接收有效负载为批处理事件列表的消息并将其传递到下游通道。每个邮件头也转换为一个列表,其中的内容是从每个事件解析的关联头值。对于分区 ID、checkpointer 和上次排队属性的公共标头,它们显示为共享同一事件的整批事件的单个值。有关更多详细信息,请参阅事件中心消息标头。EventHubsInboundChannelAdapter
ListenerMode.BATCH
EventHubsInboundChannelAdapter
批处理使用者的检查点支持两种模式:和.mode是一种自动检查点模式,一旦收到整批事件,就将它们一起检查。mode是用户对事件进行检查点。使用时,检查指针 将传递到消息头中,用户可以使用它来执行检查点。BATCH
MANUAL
BATCH
MANUAL
批处理使用策略可以通过属性 of and 指定,其中必要的属性 while 是可选的。 若要指定批量使用策略,开发人员可用于配置。有关如何配置的示例,请参阅以下部分。max-size
max-wait-time
max-size
max-wait-time
EventHubsContainerProperties
EventHubsContainerProperties
13.1.2. 依赖设置
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId> </dependency>
13.1.3. 配置
此启动器提供以下 3 个配置选项部分:
连接配置属性
本部分包含用于连接到 Azure 事件中心的配置选项。
如果选择使用安全主体对 Azure Active Directory 进行身份验证和授权以访问 Azure 资源,请参阅使用 Azure AD 授权访问,以确保已向安全主体授予访问 Azure 资源的足够权限。
Table 20. Connection configurable properties of spring-cloud-azure-starter-integration-eventhubs
财产
类型
描述
spring.cloud.azure.eventhubs.enabled
布尔
是否启用了 Azure 事件中心。
spring.cloud.azure.eventhubs.connection-string
字符串
事件中心命名空间连接字符串值。
spring.cloud.azure.eventhubs.namespace
字符串
事件中心命名空间值,它是 FQDN 的前缀。FQDN 应由 <命名空间名称>.<域名>组成
spring.cloud.azure.eventhubs.domain-name
字符串
Azure 事件中心命名空间值的域名。
spring.cloud.azure.eventhubs.custom-endpoint-address
字符串
自定义终结点地址。
spring.cloud.azure.eventhubs.shared-connection
布尔
底层 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的连接。由 默认情况下,将为每个创建的事件中心客户端构造和使用新连接。
检查点配置属性
本部分包含存储 Blob 服务的配置选项,该服务用于保留分区所有权和检查点信息。
从版本 4.0.0 开始,如果未手动启用spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 的属性,则不会自动创建存储容器。
Table 21. Checkpointing configurable properties of spring-cloud-azure-starter-integration-eventhubs
财产
类型
描述
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists
布尔
如果不存在,是否允许创建容器。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name
字符串
存储帐户的名称。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key
字符串
存储帐户访问密钥。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name
字符串
存储容器名称。
常见的 Azure 服务 SDK 配置选项也可以为存储 Blob 检查点存储进行配置。支持的配置选项在“配置”页中介绍,可以使用统一前缀前缀进行配置。spring.cloud.azure.
spring.cloud.azure.eventhubs.processor.checkpoint-store
事件中心处理器配置属性
使用来自事件中心的消息,以配置一个 开发人员可用于配置。请参阅以下部分,了解如何使用。EventHubsInboundChannelAdapter
EventProcessorClient
EventProcessorClient
EventHubsContainerProperties
EventHubsInboundChannelAdapter
13.1.4. 基本用法
将消息发送到 Azure 事件中心
第 1 步。填写凭据配置选项。
对于作为连接字符串的凭据,请在以下位置配置以下属性:application.yml
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
对于作为托管标识的凭据,请在以下位置配置以下属性:application.yml
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
对于作为服务主体的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: ${AZURE_TENANT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
第 2 步。使用 bean of 创建以将消息发送到事件中心。DefaultMessageHandler
EventHubsTemplate
class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
第 3 步。通过消息通道创建与上述消息处理程序绑定的消息网关。
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
第 4 步。使用网关发送消息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
从 Azure 事件中心接收消息
第 1 步。填写凭据配置选项。
第 2 步。创建一个消息通道的 Bean 作为输入通道。
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
第 3 步。使用 bean of 创建以从事件中心接收消息。EventHubsInboundChannelAdapter
EventHubsMessageListenerContainer
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
第 4 步。通过之前创建的消息通道创建与事件中心入站通道适配器绑定的消息接收器。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
配置 EventHubsMessageConverter 以自定义 ObjectMapper
EventHubsMessageConverter作为可配置的 Bean 制作,以允许用户自定义 ObjectMapper。
批量消费者支持
批量使用来自事件中心的消息与上述示例类似,此外用户应为其设置批量使用的相关配置选项。EventHubsInboundChannelAdapter
创建时,侦听器模式应设置为 。创建 bean of 时,将检查点模式设置为 anyor,并且可以根据需要配置批处理选项。EventHubsInboundChannelAdapter
BATCH
EventHubsMessageListenerContainer
MANUAL
BATCH
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.getBatch().setMaxSize(100); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
事件中心消息标头
下表说明了如何将事件中心消息属性映射到 Spring 消息标头。对于 Azure 事件中心,消息称为。event
Table 22. Mapping between Event Hubs Message / Event Properties and Spring Message Headers in Record Listener Mode
事件中心事件属性
Spring 消息头常量
类型
描述
排队时间
事件中心标头#ENQUEUED_TIME
瞬间
事件在事件中心分区中排队的时刻(以 UTC 为单位)。
抵消
事件中心标头#偏移量
长
从关联的事件中心分区接收事件时的偏移量。
分区键
AzureHeaders#PARTITION_KEY
字符串
分区哈希键(如果在最初发布事件时设置)。
分区编号
AzureHeaders#RAW_PARTITION_ID
字符串
事件中心的分区 ID。
序列号
事件中心标头#SEQUENCE_NUMBER
长
在关联的事件中心分区中排队时分配给事件的序列号。
上次排队的事件属性
EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES
上一个排队事件属性
此分区中最后一个排队事件的属性。
那
AzureHeaders#CHECKPOINTER
检查指针
特定消息的检查点标头。
用户可以分析消息标头以获取每个事件的相关信息。若要为事件设置消息标头,所有自定义标头都将作为事件的应用程序属性放置,其中标头设置为属性键。从事件中心收到事件时,所有应用程序属性都将转换为消息标头。
不支持手动设置分区键、排队时间、偏移量和序列号的消息头。
启用批处理使用者模式后,将按如下所示列出批处理消息的特定标头,其中包含每个事件中心事件的值列表。
Table 23. Mapping between Event Hubs Message / Event Properties and Spring Message Headers in Batch Listener Mode
事件中心事件属性
春季批处理消息标头常量
类型
描述
排队时间
事件中心标头#ENQUEUED_TIME
即时列表
事件中心分区中每个事件排队的时刻列表(采用 UTC 格式)。
抵消
事件中心标头#偏移量
长列表
从关联的事件中心分区接收每个事件时的偏移量列表。
分区键
AzureHeaders#PARTITION_KEY
字符串列表
分区哈希键的列表(如果在最初发布每个事件时设置)。
序列号
事件中心标头#SEQUENCE_NUMBER
长列表
在关联的事件中心分区中排队时分配给每个事件的序列号列表。
系统属性
事件中心标头#BATCH_CONVERTED_SYSTEM_PROPERTIES
地图列表
每个事件的系统属性列表。
应用程序属性
事件中心标头#BATCH_CONVERTED_APPLICATION_PROPERTIES
地图列表
每个事件的应用程序属性列表,其中放置了所有自定义消息头或事件属性。
发布消息时,上述所有批处理标头都将从消息中删除(如果存在)。
13.1.5. 示例
有关更多详细信息,请参阅azure-spring-boot-samples。
13.2. 与 Azure 服务总线的 Spring 集成
13.2.1. 关键概念
Spring 集成支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明式适配器与外部系统集成。
Azure 服务总线扩展的春季集成项目为 Azure 服务总线提供入站和出站通道适配器。
CompletableFuture 支持 API 已从版本 2.10.0 中弃用,并从版本 4.0.0 中被反应堆核心取代。 有关详细信息,请参阅 Javadoc。
13.2.2. 依赖设置
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId> </dependency>
13.2.3. 配置
此启动器提供以下 2 个配置选项部分:
连接配置属性
本部分包含用于连接到 Azure 服务总线的配置选项。
如果选择使用安全主体对 Azure Active Directory 进行身份验证和授权以访问 Azure 资源,请参阅使用 Azure AD 授权访问,以确保已向安全主体授予访问 Azure 资源的足够权限。
Table 24. Connection configurable properties of spring-cloud-azure-starter-integration-servicebus
财产
类型
描述
spring.cloud.azure.servicebus.enabled
布尔
是否启用了 Azure 服务总线。
spring.cloud.azure.servicebus.connection-string
字符串
服务总线命名空间连接字符串值。
spring.cloud.azure.servicebus.namespace
字符串
服务总线命名空间值,它是 FQDN 的前缀。FQDN 应由 <命名空间名称>.<域名>组成
spring.cloud.azure.servicebus.domain-name
字符串
Azure 服务总线命名空间值的域名。
服务总线处理器配置属性
使用使用消息,配置整体属性, 开发人员可用于配置。请参阅以下部分,了解如何使用。ServiceBusInboundChannelAdapterServiceBusProcessorClientServiceBusProcessorClientServiceBusContainerPropertiesServiceBusInboundChannelAdapter
13.2.4. 基本用法
将消息发送到 Azure 服务总线
第 1 步。填写凭据配置选项。
对于作为连接字符串的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
对于作为托管标识的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: ${AZURE_TENANT_ID} servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
对于作为服务主体的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: ${AZURE_TENANT_ID} servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
第 2 步。创建与 bean of 将消息发送到服务总线, 设置服务总线模板的实体类型。此示例以服务总线队列为例。DefaultMessageHandler
ServiceBusTemplate
class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
第 3 步。通过消息通道创建与上述消息处理程序绑定的消息网关。
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
第 4 步。使用网关发送消息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
从 Azure 服务总线接收消息
第 1 步。填写凭据配置选项。
第 2 步。创建一个消息通道的 Bean 作为输入通道。
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
第 3 步。使用 bean of 创建以接收到服务总线的消息。此示例以服务总线队列为例。ServiceBusInboundChannelAdapter
ServiceBusMessageListenerContainer
@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
第 4 步。通过我们之前创建的消息通道创建与 ServiceBusInboundChannelAdapter 绑定的消息接收器。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
配置 ServiceBusMessageConverter 以自定义 ObjectMapper
ServiceBusMessageConverter
作为可配置的 Bean 制作,以允许用户自定义 ObjectMapper。
服务总线消息标头
对于某些可以映射到多个 Spring 标头常量的服务总线标头,将列出不同 Spring 标头的优先级。
Table 25. Mapping between Service Bus Headers and Spring Headers
服务总线消息标头和属性
Spring 消息头常量
类型
配置
描述
内容类型
消息头#CONTENT_TYPE
字符串
是的
消息的 RFC2045 内容类型描述符。
相关标识
ServiceBusMessageHeaders#CORRELATION_ID
字符串
是的
消息的相关 ID
消息标识
ServiceBusMessageHeaders#MESSAGE_ID
字符串
是的
消息的消息 ID,此标头的优先级高于。MessageHeaders#ID
消息标识
消息头#ID
无人居住地
是的
消息的消息 ID,此标头的优先级低于。ServiceBusMessageHeaders#MESSAGE_ID
分区键
ServiceBusMessageHeaders#PARTITION_KEY
字符串
是的
用于将消息发送到分区实体的分区键。
回复
消息头#REPLY_CHANNEL
字符串
是的
要向其发送答复的实体的地址。
回复会话 ID
ServiceBusMessageHeaders#REPLY_TO_SESSION_ID
字符串
是的
消息的 ReplyToGroupId 属性值。
计划排队时间 UTC
ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME
偏移日期时间
是的
消息应在服务总线中排队的日期时间,此标头的优先级高于此级别。AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE
计划排队时间 UTC
AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE
整数
是的
消息应在服务总线中排队的日期时间,此标头的优先级低于。ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME
会话标识
ServiceBusMessageHeaders#SESSION_ID
字符串
是的
会话感知实体的会话标识符。
生活时间
ServiceBusMessageHeaders#TIME_TO_LIVE
期间
是的
此消息过期之前的持续时间。
自
ServiceBusMessageHeaders#TO
字符串
是的
消息的“to”地址,保留供将来在路由方案中使用,目前被代理本身忽略。
主题
ServiceBusMessageHeaders#SUBJECT
字符串
是的
邮件的主题。
死信错误说明
ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION
字符串
不
已死信的消息的说明。
死信原因
ServiceBusMessageHeaders#DEAD_LETTER_REASON
字符串
不
消息是死信的原因。
死信来源
ServiceBusMessageHeaders#DEAD_LETTER_SOURCE
字符串
不
消息为死信的实体。
交货计数
ServiceBusMessageHeaders#DELIVERY_COUNT
长
不
此消息传递到客户端的次数。
排队的序列号
ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER
长
不
服务总线分配给消息的排队序列号。
排队时间
ServiceBusMessageHeaders#ENQUEUED_TIME
偏移日期时间
不
此消息在服务总线中排队的日期时间。
到期时间
ServiceBusMessageHeaders#EXPIRES_AT
偏移日期时间
不
此消息将过期的日期时间。
锁定令牌
ServiceBusMessageHeaders#LOCK_TOKEN
字符串
不
当前消息的锁定令牌。
锁定到
ServiceBusMessageHeaders#LOCKED_UNTIL
偏移日期时间
不
此消息的锁定过期的日期时间。
序列号
ServiceBusMessageHeaders#SEQUENCE_NUMBER
长
不
服务总线分配给消息的唯一编号。
州
ServiceBusMessageHeaders#STATE
服务总线消息状态
不
消息的状态,可以是“活动”、“延迟”或“计划”。
分区键支持
此初学者通过允许在消息标头中设置分区键和会话 ID 来支持服务总线分区。本节介绍如何设置消息的分区键。
推荐: 用作标头的键。ServiceBusMessageHeaders.PARTITION_KEY
public class SampleController { @PostMapping("/messages") public ResponseEntity<String> sendMessage(@RequestParam String message) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key") .build(), Sinks.EmitFailureHandler.FAIL_FAST); return ResponseEntity.ok("Sent!"); } }
不推荐,但当前支持: 作为标头的键。AzureHeaders.PARTITION_KEY
public class SampleController { @PostMapping("/messages") public ResponseEntity<String> sendMessage(@RequestParam String message) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key") .build(), Sinks.EmitFailureHandler.FAIL_FAST); return ResponseEntity.ok("Sent!"); } }
当在邮件头中设置两者时,是首选。ServiceBusMessageHeaders.PARTITION_KEY
AzureHeaders.PARTITION_KEY
ServiceBusMessageHeaders.PARTITION_KEY
会话支持
此示例演示如何在应用程序中手动设置消息的会话 ID。
public class SampleController { @PostMapping("/messages") public ResponseEntity<String> sendMessage(@RequestParam String message) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id") .build(), Sinks.EmitFailureHandler.FAIL_FAST); return ResponseEntity.ok("Sent!"); } }
当在邮件头中设置,并且还设置了不同的(或)头时, 会话 ID 的值最终将用于覆盖分区键的值。ServiceBusMessageHeaders.SESSION_ID
ServiceBusMessageHeaders.PARTITION_KEY
AzureHeaders.PARTITION_KEY
13.2.5. 示例
有关更多详细信息,请参阅azure-spring-boot-samples。
13.3. Spring 与 Azure 存储队列集成
13.3.1. 关键概念
Azure 队列存储是一种用于存储大量消息的服务。您可以通过使用 HTTP 或 HTTPS 的经过身份验证的调用从世界任何地方访问消息。队列消息的大小最大为 64 KB。队列可能包含数百万条消息,最高可达存储帐户的总容量限制。队列通常用于创建要异步处理的积压工作。
13.3.2. 依赖设置
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId> </dependency>
13.3.3. 配置
此初学者提供以下配置选项:
连接配置属性
本部分包含用于连接到 Azure 存储队列的配置选项。
如果选择使用安全主体对 Azure Active Directory 进行身份验证和授权以访问 Azure 资源,请参阅使用 Azure AD 授权访问,以确保已向安全主体授予访问 Azure 资源的足够权限。
Table 26. Connection configurable properties of spring-cloud-azure-starter-integration-storage-queue
财产
类型
描述
spring.cloud.azure.storage.queue.enabled
布尔
是否启用了 Azure 存储队列。
spring.cloud.azure.storage.queue.connection-string
字符串
存储队列命名空间连接字符串值。
spring.cloud.azure.storage.queue.accountName
字符串
存储队列帐户名称。
spring.cloud.azure.storage.queue.accountKey
字符串
存储队列帐户密钥。
spring.cloud.azure.storage.queue.endpoint
字符串
存储队列服务终结点。
spring.cloud.azure.storage.queue.sasToken
字符串
SAS 令牌凭据
spring.cloud.azure.storage.queue.serviceVersion
队列服务版本
发出 API 请求时使用的队列服务版本。
spring.cloud.azure.storage.queue.messageEncoding
字符串
队列消息编码。
13.3.4. 基本用法
将消息发送到 Azure 存储队列
第 1 步。填写凭据配置选项。
对于作为连接字符串的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
对于作为托管标识的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: ${AZURE_TENANT_ID} storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
对于作为服务主体的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: ${AZURE_TENANT_ID} storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
第 2 步。使用 bean of 创建将消息发送到存储队列。DefaultMessageHandler
StorageQueueTemplate
class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
第 3 步。通过消息通道创建与上述消息处理程序绑定的消息网关。
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
第 4 步。使用网关发送消息。
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
从 Azure 存储队列接收消息
第 1 步。填写凭据配置选项。
第 2 步。创建一个消息通道的 Bean 作为输入通道。
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
第 3 步。使用 bean of 创建以接收到存储队列的消息。StorageQueueMessageSource
StorageQueueTemplate
class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
第 4 步。通过我们之前创建的消息通道,使用在上一步中创建的 StorageQueueMessageSource 创建消息接收器绑定。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
13.3.5. 示例
有关更多详细信息,请参阅azure-spring-boot-samples。
14. 春云流支持
Spring Cloud Stream 是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。
该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。
当前的绑定程序实现包括:
spring-cloud-azure-stream-binder-eventhubs
弹簧-云-Azure-流-绑定器-服务总线
14.1. 适用于 Azure 事件中心的春季云流绑定器
14.1.1. 关键概念
适用于 Azure 事件中心的 Spring Cloud Stream Binder 提供 Spring Cloud Stream 框架的绑定实现。 此实现在其基础上使用 Spring 集成事件中心通道适配器。从设计的角度来看, 事件中心类似于 Kafka。此外,可以通过 Kafka API 访问事件中心。如果您的项目具有紧密依赖关系 在 Kafka API 上,可以试用事件中心和 Kafka API 示例
消费群体
事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。虽然卡夫卡 将所有已提交的偏移量存储在代理中,必须存储事件中心消息的偏移量 正在手动处理。事件中心 SDK 提供了在 Azure 存储中存储此类偏移的函数。
分区支持
事件中心提供与 Kafka 类似的物理分区概念。但与 Kafka 在使用者和分区之间的自动重新平衡不同,事件中心提供了一种抢占模式。存储帐户充当租约,以确定哪个分区归哪个使用者所有。当新的消费者启动时,它会尝试窃取一些分区 从大多数重载消费者实现工作负载均衡。
为了指定负载平衡策略,提供了属性 ofare。有关更多详细信息,请参阅使用者属性。spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
批量消费者支持
Spring Cloud Azure Stream Event Hubs 绑定程序支持Spring Cloud Stream Batch 使用者功能。
若要使用批处理使用者模式,应将属性设置为 。启用后,将接收有效负载为批处理事件列表的消息并将其传递给函数。每个邮件头也转换为一个列表,其中的内容是从每个事件解析的关联头值。对于分区 ID、checkpointer 和上次排队属性的公共标头,它们显示为共享同一事件的整批事件的单个值。有关更多详细信息,请参阅事件中心消息标头。spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
true
Consumer
批处理使用者的检查点支持两种模式:and.mode 是一种自动检查点模式,用于在绑定器收到整批事件后将整批事件一起检查。mode 是用户对事件进行检查点。使用时,检查指针 将传递到消息头中,用户可以使用它来执行检查点。BATCH
MANUAL
BATCH
MANUAL
批大小可以通过属性指定 andwith 前缀为 ,其中必要的属性 while 是可选的。有关更多详细信息,请参阅使用者属性。max-size
max-wait-time
spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
max-size
max-wait-time
14.1.2. 依赖设置
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
或者,也可以使用 Spring Cloud Azure Stream Event Hub 初学者,如以下 Maven 示例所示:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId> </dependency>
14.1.3. 配置
活页夹提供以下 3 个配置选项部分:
连接配置属性
本部分包含用于连接到 Azure 事件中心的配置选项。
如果选择使用安全主体对 Azure Active Directory 进行身份验证和授权以访问 Azure 资源,请参阅使用 Azure AD 授权访问,以确保已向安全主体授予访问 Azure 资源的足够权限。
Table 27. Connection configurable properties of spring-cloud-azure-stream-binder-eventhubs
财产
类型
描述
spring.cloud.azure.eventhubs.enabled
布尔
是否启用了 Azure 事件中心。
spring.cloud.azure.eventhubs.connection-string
字符串
事件中心命名空间连接字符串值。
spring.cloud.azure.eventhubs.namespace
字符串
事件中心命名空间值,它是 FQDN 的前缀。FQDN 应由 <命名空间名称>.<域名>组成
spring.cloud.azure.eventhubs.domain-name
字符串
Azure 事件中心命名空间值的域名。
spring.cloud.azure.eventhubs.custom-endpoint-address
字符串
自定义终结点地址。
常见的 Azure 服务 SDK 配置选项也可以为 Spring Cloud Azure Stream Event Hub 绑定器进行配置。支持的配置选项在“配置”页中介绍,可以使用统一前缀前缀进行配置。spring.cloud.azure.
spring.cloud.azure.eventhubs.
默认情况下,绑定器还支持Spring Can Azure资源管理器。若要了解如何使用未授予相关角色的安全主体检索连接字符串,请参阅资源管理器示例以了解详细信息。Data
检查点配置属性
本部分包含存储 Blob 服务的配置选项,该服务用于保留分区所有权和检查点信息。
从版本 4.0.0 开始,如果未手动启用spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 的属性,则不会使用spring.cloud.stream.bindings.<binding-name>.destination 的名称自动创建存储容器。
Table 28. Checkpointing configurable properties of spring-cloud-azure-stream-binder-eventhubs
财产
类型
描述
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists
布尔
如果不存在,是否允许创建容器。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name
字符串
存储帐户的名称。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key
字符串
存储帐户访问密钥。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name
字符串
存储容器名称。
常见的 Azure 服务 SDK 配置选项也可以为存储 Blob 检查点存储进行配置。支持的配置选项在“配置”页中介绍,可以使用统一前缀前缀进行配置。spring.cloud.azure.
spring.cloud.azure.eventhubs.processor.checkpoint-store
Azure 事件中心绑定配置属性
以下选项分为四个部分:消费者属性、高级消费者 配置、创建者属性和高级创建者配置。
消费者属性
这些属性通过以下方式公开。EventHubsConsumerProperties
Table 29. Consumer configurable properties of spring-cloud-azure-stream-binder-eventhubs
财产
类型
描述
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.checkpoint.mode
检查点模式
使用者决定如何检查点消息时使用的检查点模式
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.checkpoint.count
整数
确定每个分区执行一个检查点的消息量。仅在使用检查点模式时生效。PARTITION_COUNT
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.checkpoint.interval
期间
确定执行一个检查点的时间间隔。仅在使用检查点模式时生效。TIME
spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch .max-size
整数
批处理中的最大事件数。批处理使用者模式是必需的。
spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch .max-wait-time
期间
批处理消耗的最长时间。仅当启用批处理使用者模式且为可选模式时才会生效。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.load-balancing.update-interval
期间
更新的间隔持续时间。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.load-balancing.strategy
负载均衡策略
负载平衡策略。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.load-balancing.partition-ownership-expiration-interval
期间
分区所有权过期的持续时间。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.track-last-enqueued-event-properties
布尔
事件处理器是否应请求有关其关联分区上最后一个排队事件的信息,并在接收事件时跟踪该信息。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.prefetch-count
整数
使用者用于控制事件中心使用者将主动接收并在本地排队的事件数的计数。
spring.cloud.stream.eventhubs.bindings.<binding-name>. consumer.initial-partition-event-position
使用键作为分区 ID 的映射,值为StartPositionProperties
包含每个分区(如果检查点存储中不存在分区的检查点)要使用的事件位置的映射。此映射与分区 ID 无关。
配置接受 ato 指定每个事件中心的初始位置。因此,它的键是分区 id,值是其中包括偏移量、序列号、排队日期时间和是否包含的属性。例如,您可以将其设置为initial-partition-event-position
map
StartPositionProperties
spring: cloud: stream: eventhubs: bindings: <binding-name>: consumer: initial-partition-event-position: 0: offset: earliest 1: sequence-number: 100 2: enqueued-date-time: 2022-01-12T13:32:47.650005Z 4: inclusive: false
高级消费者配置
支持为每个绑定器使用者自定义上述连接、检查点和常见的 Azure SDK 客户端配置,可以使用前缀进行配置。spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
生产者属性
这些属性通过以下方式公开。EventHubsProducerProperties
Table 30. Producer configurable properties of spring-cloud-azure-stream-binder-eventhubs
财产
类型
描述
spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.sync
布尔
用于同步生产者的开关标志。如果为 true,则生成者将在发送操作后等待响应。
spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.send-timeout
长
发送操作后等待响应的时间量。只有在启用同步创建器时才会生效。
高级生产者配置
支持为每个绑定器生成器自定义上述连接和常见的 Azure SDK 客户端配置,可以使用前缀进行配置。spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
14.1.4. 基本用法
从事件中心发送和接收消息
第 1 步。使用凭据信息填充配置选项。
对于作为连接字符串的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} stream: function: definition: consume;supply bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
对于作为服务主体的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: ${AZURE_TENANT_ID} eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} stream: function: definition: consume;supply bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
对于作为托管标识的凭据,请在 application.yml 中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} stream: function: definition: consume;supply bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
步骤2.定义供应商和消费者。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
分区支持
A用用户提供的分区信息将创建来配置有关要发送的消息的分区信息,以下是获取不同优先级的分区ID和密钥的过程:PartitionSupplier
批量消费者支持
第 1 步。填写批处理配置选项
spring: cloud: stream: function: definition: consume bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
步骤2.定义供应商和消费者。
对于检查点模式 as,可以使用以下代码发送消息并批量使用。BATCH
@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload(""test"+ i++ +""").build(); }; }
对于检查点模式,您可以使用以下代码批量发送消息和使用/检查点。MANUAL
@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload(""test"+ i++ +""").build(); }; }
在批处理使用模式下,Spring Cloud Stream 绑定器的默认内容类型为,因此请确保消息负载与内容类型对齐。例如,当使用默认内容类型 ofto 接收带有有效负载的消息时,有效负载应为 JSON 字符串,并用双引号括起来表示原始字符串文本。对于内容类型,它可以直接是对象。详情请参见《春云流内容类型协商》官方文档。application/json
application/json
String
text/plain
String
错误通道
此通道默认打开,您可以通过以下方式处理错误消息:
// Replace destination with spring.cloud.stream.bindings.input.destination // Replace group with spring.cloud.stream.bindings.input.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void consumerError(Message<?> message) { LOGGER.error("Handling customer ERROR: " + message); }
默认情况下,此通道不打开。您需要在 application.properties 中添加配置以启用它,如下所示:
spring.cloud.stream.default.producer.errorChannelEnabled=true
您可以通过以下方式处理错误消息:
// Replace destination with spring.cloud.stream.bindings.output.destination @ServiceActivator(inputChannel = "{destination}.errors") public void producerError(Message<?> message) { LOGGER.error("Handling Producer ERROR: " + message); }
默认情况下,Spring 集成会创建一个名为“errorChannel”的全局错误通道,该通道允许用户为其订阅许多端点。
@ServiceActivator(inputChannel = "errorChannel") public void producerError(Message<?> message) { LOGGER.error("Handling ERROR: " + message); }
事件中心消息标头
有关支持的基本消息标头,请参阅事件中心消息标头。
多种粘合剂支持
使用多个绑定程序还支持连接到多个事件中心命名空间。此示例以连接字符串为例。还支持服务主体和托管标识的凭据,用户可以在每个绑定程序的环境设置中设置相关属性。
第 1 步。若要使用事件中心的多个绑定程序,我们需要在 application.yml 中配置以下属性
spring: cloud: stream: function: definition: consume1;supply1;consume2;supply2 bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
第 2 步。我们需要定义两个供应商和两个消费者
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
资源提供
spring: cloud: azure: credential: tenant-id: ${AZURE_TENANT_ID} profile: subscription-id: ${AZURE_SUBSCRIPTION_ID} eventhubs: resource: resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}