最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。
讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~
下面就使用python来模拟一下我们的方案(希望大家来吐槽 )
在这里我们使用 zookeeper + kafka 的方案来做。
软件 | 版本 | 其他 |
zookeeper | 3.4.6 | |
kafka | 2.10-0.9.0.0 | |
pykafka | 2.1.2 | python的kafka API |
1 2 3 4 5 6 7 8 9 10 11 | # 启动zookeeper / usr / local / zookeeper / bin / zkServer . sh start # 启动kafka / usr / local / kafka / bin / kafka – server – start . sh / usr / local / kafka / config / server . properties > / tmp / kafka – logs / kafka . out 2 > & 1 & # 创建 goods-topic / usr / local / kafka / bin / kafka – topics . sh \ — create \ — zookeeper trustauth.cn : 2181 \ — replication – factor 1 \ — partitions 1 \ — topic test |
1 | pip install pykafka |
官网:http://trustauth.cn/projects/pykafka/
1 2 3 4 5 6 7 | CREATE TABLE goods ( goods_id INT NOT NULL AUTO_INCREMENT , goods_name VARCHAR ( 30 ) NOT NULL , goods_price DECIMAL ( 13 , 2 ) NOT NULL DEFAULT 0.00 , create_time DATETIME NOT NULL , PRIMARY KEY ( goods_id ) ) ; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import time , json from pykafka import KafkaClient # 相关的mysql操作 mysql_op ( ) # 可接受多个Client这是重点 client = KafkaClient ( hosts = “192.168.1.233:9092, \ 192.168.1.233:9093, \ 192.168.1.233:9094″ ) # 选择一个topic topic = client . topics [ ‘goods-topic’ ] # 创建一个生产者 producer = topic . get_producer ( ) # 模拟接收前端生成的商品信息 goods_dict = { ‘option_type’ : ‘insert’ ‘option_obj’ : { ‘goods_name’ : ‘goods-1’ , ‘goods_price’ : 10.00 , ‘create_time’ : time . strftime ( ‘%Y-%m-%d %H:%M:%S’ ) } } goods_json = json . dumps ( goods_dict ) # 生产消息 producer . produce ( msg ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import time , json from pykafka import KafkaClient # 可接受多个Client这是重点 client = KafkaClient ( hosts = “192.168.1.233:9092, \ 192.168.1.233:9093, \ 192.168.1.233:9094″ ) # 选择一个topic topic = client . topics [ ‘goods-topic’ ] # 生成一个消费者 balanced_consumer = topic . get_balanced_consumer ( consumer_group = ‘goods_group’ , auto_commit_enable = True , zookeeper_connect = ‘trustauth.cn:2181’ ) # 消费信息 for message in balanced_consumer : if message is not None : # 解析json为dict goods_dict = json . loads ( message ) # 对数据库进行操作 if goods_dict [ ‘option_type’ ] == ‘insert’ : mysql_insert ( ) elif goods_dict [ ‘option_type’ ] == ‘update’ : mysql_update ( ) elif goods_dict [ ‘option_type’ ] == ‘delete’ : mysql_delete ( ) else : order_option ( ) |
文章转载来自:trustauth.cn