【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

为您推荐:小红书API     抖音查询API      chatGPT API

问题描述

当使用SDK连接到Azure Event Hub时,最常规的方式为使用连接字符串。这种做法参考官网文档就可成功完成代码:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

只是,如果使用Azure AD认证方式进行访问,代码需要如何修改呢? 如何来使用AAD的 TokenCredential呢?

 

分析问题

在使用Connection String的时候,EventProcessorClientBuilder使用connectionString方法配置连接字符串。

【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

 

如果使用Azure AD认证,则需要先根据AAD中注册应用获取到Client ID, Tenant ID, Client Secret,然后把这些内容设置为系统环境变量

  1. AZURE_TENANT_ID :对应AAD 注册应用页面的 Tenant ID
  2. AZURE_CLIENT_ID :对应AAD 注册应用页面的 Application (Client) ID
  3. AZURE_CLIENT_SECRET :对应AAD 注册应用的 Certificates & secrets 中创建的Client Secrets

然后使用 credential 初始化 EventProcessorClientBuilder 对象

【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

注意点:

1) DefaultAzureCredentialBuilder 需要指定 Authority Host为 Azure China

2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名

 

操作实现

第一步:为AAD注册应用赋予操作Event Hub Data的权限

Azure 提供了以下 Azure 内置角色,用于通过 Azure AD 和 OAuth 授予对事件中心数据的访问权限:

  1. Azure 事件中心数据所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予对事件中心资源的完全访问权限。
  2. Azure 事件中心数据发送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予对事件中心资源的发送访问权限。
  3. Azure 事件中心数据接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予对事件中心资源的使用/接收访问权限。

本实例中,只需要接收数据,所以只赋予了 Azure Event Hubs Data Receiver权限。

【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

 

第二步:在Java 项目中添加SDK依赖  

添加在pom.xml文件中 dependencies 部分的内容为:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出现运行时出现类型冲突或找不到

<?xml version="1.0" encoding="UTF-8"?>  <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>    <groupId>com.example</groupId>   <artifactId>spdemo</artifactId>   <version>1.0-SNAPSHOT</version>    <name>spdemo</name>   <!-- FIXME change it to the project's website -->   <url>http://www.example.com</url>    <properties>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>     <maven.compiler.source>1.8</maven.compiler.source>     <maven.compiler.target>1.8</maven.compiler.target>   </properties>    <dependencies>     <dependency>       <groupId>com.azure</groupId>       <artifactId>azure-messaging-eventhubs</artifactId>       <version>5.12.2</version>     </dependency>     <dependency>       <groupId>com.azure</groupId>       <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>       <version>1.14.0</version>     </dependency>     <dependency>       <groupId>com.azure</groupId>       <artifactId>azure-identity</artifactId>       <version>1.5.3</version>     </dependency>     <dependency>       <groupId>junit</groupId>       <artifactId>junit</artifactId>       <version>4.11</version>       <scope>test</scope>     </dependency>   </dependencies>    <build>     <plugins>       <plugin>         <groupId>org.apache.maven.plugins</groupId>         <artifactId>maven-assembly-plugin</artifactId>         <version>2.5.5</version>         <configuration>           <archive>             <manifest>               <mainClass>com.example.App</mainClass>             </manifest>           </archive>           <descriptorRefs>             <descriptorRef>jar-with-dependencies</descriptorRef>           </descriptorRefs>         </configuration>       </plugin>     </plugins>   </build> </project>

 

第三步:添加完整代码

package com.example;  import com.azure.core.credential.TokenCredential; import com.azure.identity.AzureAuthorityHosts; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*;  import java.io.IOException; import java.sql.Date; import java.time.Instant; import java.time.temporal.TemporalUnit; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer;  /**  * Hello world!  *  */ public class App {     // private static final String connectionString ="<connectionString>";     // private static final String eventHubName = "<eventHubName>";     // private static final String storageConnectionString = "<storageConnectionString>";     // private static final String storageContainerName = "<storageContainerName>";      public static void main(String[] args) throws IOException {         System.out.println("Hello World!");          // String connectionString ="<connectionString>";          // The fully qualified namespace for the Event Hubs instance. This is likely to         // be similar to:         // {your-namespace}.servicebus.windows.net         // String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn";         // String eventHubName = "<eventHubName>";          String storageConnectionString = System.getenv("storageConnectionString");         String storageContainerName = System.getenv("storageContainerName");         String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace");         String eventHubName = System.getenv("eventHubName");          TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA)                 .build();          // Create a blob container client that you use later to build an event processor         // client to receive and process events         BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()                 .connectionString(storageConnectionString)                 .containerName(storageContainerName)                 .buildAsyncClient();                       // EventHubProducerClient         // EventHubProducerClient client = new EventHubClientBuilder()         // .credential(fullyQualifiedNamespace, eventHubName, credential)         // .buildProducerClient();          Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();         initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));           // EventProcessorClientBuilder         // Create a builder object that you will use later to build an event processor         // client to receive and process events and errors.         EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()                 // .connectionString(connectionString, eventHubName)                 .credential(fullyQualifiedNamespace, eventHubName, credential)                 .initialPartitionEventPosition(initialPartitionEventPosition)                 .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)                 .processEvent(PARTITION_PROCESSOR)                 .processError(ERROR_HANDLER)                 .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));          // EventPosition.f         // Use the builder object to create an event processor client         EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();          System.out.println("Starting event processor");         eventProcessorClient.start();          System.out.println("Press enter to stop.");         System.in.read();          System.out.println("Stopping event processor");         eventProcessorClient.stop();         System.out.println("Event processor stopped.");          System.out.println("Exiting process");      }      public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {         PartitionContext partitionContext = eventContext.getPartitionContext();         EventData eventData = eventContext.getEventData();          System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",                 partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());          // Every 10 events received, it will update the checkpoint stored in Azure Blob         // Storage.         if (eventData.getSequenceNumber() % 10 == 0) {             eventContext.updateCheckpoint();         }     };      public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {         System.out.printf("Error occurred in partition processor for partition %s, %s.%n",                 errorContext.getPartitionContext().getPartitionId(),                 errorContext.getThrowable());     }; }

 

 

 

附录:自定义设置 Event Position,当程序运行时,指定从Event Hub中获取消息的 Sequence Number

使用EventPosition对象中的fromSequenceNumber方法,可以指定一个序列号,Consume端会根据这个号码获取之后的消息。其他的方法还有 fromOffset(指定游标) / fromEnqueuedTime(指定一个时间点,获取之后的消息) / earliest(从最早开始) / latest(从最后开始获取新的数据,旧数据不获取) 

    Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();     initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));

注意:

Map<String, EventPosition> 中的String 对象为Event Hub的分区ID,如果Event Hub有2个分区,则它的值分别时0,1.

EventPosition 设置的值,只在Storage Account所保存在CheckPoint Store中的值没有,或者小于此处设定的值时,才会起效果。否则,Consume 会根据从Checkpoint中获取的SequenceNumber为准。

 【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

 

 

 

参考资料

授予对 Azure 事件中心的访问权限 :https://docs.azure.cn/zh-cn/event-hubs/authorize-access-event-hubs

对使用 Azure Active Directory 访问事件中心资源的应用程序进行身份验证 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application

使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

 

 

 [END]

 

 

 

发表评论

相关文章