首页域名资讯 正文

kafka的使用—系统保卫战

2025-01-28 1 0条评论

前言

最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。

讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~

下面就使用python来模拟一下我们的方案(希望大家来吐槽 )

软件介绍

在这里我们使用 zookeeper + kafka 的方案来做。

软件 版本 其他
zookeeper 3.4.6
kafka 2.10-0.9.0.0
pykafka 2.1.2 python的kafka API

先决条件

  1. 使用zookeeper、kafka创建一个topic名为 goods-topic
  2. 需要安装pykafka一个python的zookeeper、kafka API
  3. 一个goods示例数据库
  • 使用消息队列:
1 2 3 4 5 6 7 8 9 10 11 # 启动zookeeper / usr / local / zookeeper / bin / zkServer . sh start # 启动kafka / usr / local / kafka / bin / kafka server start . sh / usr / local / kafka / config / server . properties > / tmp / kafka logs / kafka . out 2 > & 1 & # 创建 goods-topic / usr / local / kafka / bin / kafka topics . sh \    create \    zookeeper trustauth.cn : 2181 \    replication factor 1 \    partitions 1 \    topic test
  • 安装pykafka:
1 pip install pykafka

官网:http://trustauth.cn/projects/pykafka/

  • 创建示例数据库:
1 2 3 4 5 6 7 CREATE TABLE goods (    goods_id INT NOT NULL AUTO_INCREMENT ,    goods_name VARCHAR ( 30 ) NOT NULL ,    goods_price DECIMAL ( 13 , 2 ) NOT NULL DEFAULT 0.00 ,    create_time DATETIME NOT NULL ,    PRIMARY KEY ( goods_id ) ) ;

代码展示

  • 生产者端伪代码-python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import time , json from pykafka import KafkaClient # 相关的mysql操作 mysql_op ( ) # 可接受多个Client这是重点 client = KafkaClient ( hosts = “192.168.1.233:9092, \                             192.168.1.233:9093, \                             192.168.1.233:9094″ ) # 选择一个topic topic = client . topics [ ‘goods-topic’ ] # 创建一个生产者 producer = topic . get_producer ( ) # 模拟接收前端生成的商品信息 goods_dict = {    ‘option_type’ : ‘insert’    ‘option_obj’ : {      ‘goods_name’ : ‘goods-1’ ,      ‘goods_price’ : 10.00 ,      ‘create_time’ : time . strftime ( ‘%Y-%m-%d %H:%M:%S’ )    } } goods_json = json . dumps ( goods_dict ) # 生产消息 producer . produce ( msg )
  • 消费者端伪代码-python(作为后台进程在跑)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import time , json from pykafka import KafkaClient # 可接受多个Client这是重点 client = KafkaClient ( hosts = “192.168.1.233:9092, \                             192.168.1.233:9093, \                             192.168.1.233:9094″ ) # 选择一个topic topic = client . topics [ ‘goods-topic’ ] # 生成一个消费者 balanced_consumer = topic . get_balanced_consumer (    consumer_group = ‘goods_group’ ,    auto_commit_enable = True ,    zookeeper_connect = ‘trustauth.cn:2181’ ) # 消费信息 for message in balanced_consumer :    if message is not None :      # 解析json为dict      goods_dict = json . loads ( message )      # 对数据库进行操作      if goods_dict [ ‘option_type’ ] == ‘insert’ :        mysql_insert ( )      elif goods_dict [ ‘option_type’ ] == ‘update’ :        mysql_update ( )      elif goods_dict [ ‘option_type’ ] == ‘delete’ :        mysql_delete ( )      else :        order_option ( )

文章转载来自:trustauth.cn

文章版权及转载声明

本文作者:亿网 网址:https://edns.com/ask/post/150733.html 发布于 2025-01-28
文章转载或复制请以超链接形式并注明出处。