1. 事件队列介绍

目前河南省算力服务器平台开提供了基于Kafka的事件服务(基于Kafka2.4.1),并提供了Python语言和Java语言的消息推送实例代码,收到消息推送需满足以下三个条件:

  • 队列地址
  • 队列名称
  • 用户名
  • 密码
    此部分信息可以在商铺后台中–开发配置中获取。

2. 事件队列核心能力

  • 用户新购买订单提醒
  • 用户续费订单提醒
  • 实例因到期而冻结
  • 实例冻结后仍未续费,需销毁
  • 实例冻结后用户续费,需解冻

3. 事件连接

通过Python连接

#coding:utf8
# 可通过改命令安装kafka依赖
# pip install kafka-python
from kafka import KafkaConsumer
import json


if __name__ == '__main__':
    server_addr = ''  # 可在个人店铺后台获取
    topic_name = ''  # 可在个人店铺后台获取
    group_id = ''   # 可在个人店铺后台获取
    user_name = ''   # 可在个人店铺后台获取
    password = ''   # 可在个人店铺后台获取
    consumer = KafkaConsumer(topic_name,
                             bootstrap_servers=['ckafka-qz5zgmgn.ap-beijing.ckafka.tencentcloudmq.com:50002'],
                             group_id=group_id,
                             auto_offset_reset='largest',
                             enable_auto_commit=True,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username=user_name,
                             sasl_plain_password=password,
                             max_poll_interval_ms=5000,  #  重要参数 不可忽略
                             session_timeout_ms=6000,   #  重要参数 不可忽略
                             request_timeout_ms=7000, #  重要参数 不可忽略
                             fetch_max_wait_ms=200, #  重要参数 不可忽略
                             fetch_min_bytes=1 #  重要参数 不可忽略
                             )

    for msg in consumer:
        try:
            data = json.loads(msg.value)
            print(data)
        except Exception as e:
            print(e)

通过JAVA连接

kafka:
    bootstrap-servers: ckafka-qz5zgmgn.ap-beijing.ckafka.tencentcloudmq.com:50002
    consumer:
      group-id: test-group-id # 消费者组ID
      auto-offset-reset: largest # 当没有偏移量或偏移量无效时,从何处开始消费
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    streams:
      properties:
        application.id: kafka-streams-id # 应用ID
    properties:
      auto.create.topics.enable: true # 开启自动创建话题功能,默认是false,根据需要设置
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username=user_name password=password;
      security:
        protocol: SASL_PLAINTEXT
作者:admin  创建时间:2025-03-03 09:43
最后编辑:admin  更新时间:2025-06-12 03:26
上一篇:
下一篇: