热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mysql不对称复制数据_实例详解MySQL与Elasticsearch数据不对称问题

jdbc-input-plugin只能实现数据库的追加,对于elasticsearch增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。本文主要介绍了MySQL 与 Elasticsearch 数据不对称问题解决办法的相关资料,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作,这里提供解决办法,需要的朋友可以参考下,希望能帮助到大家。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;

+-------------+--------------+------+-----+--------------------------------+-------+

| Field | Type | Null | Key | Default | Extra |

+-------------+--------------+------+-----+--------------------------------+-------+

| id | int(11) | NO | | 0 | |

| title | mediumtext | NO | | NULL | |

| description | mediumtext | YES | | NULL | |

| author | varchar(100) | YES | | NULL | |

| source | varchar(100) | YES | | NULL | |

| content | longtext | YES | | NULL | |

| status | enum('Y','N')| NO | | 'N' | |

| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |

| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |

+-------------+--------------+------+-----+--------------------------------+-------+

7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {

jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"

jdbc_user => "cms"

jdbc_password => "password"

schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次

statement => "select * from article where mtime > :sql_last_value"

use_column_value => true

tracking_column => "mtime"

tracking_column_type => "timestamp"

record_last_run => true

last_run_metadata_path => "/var/tmp/article-mtime.last"

}

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (

`id` int(11) NOT NULL,

`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW

BEGIN

-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。

IF NEW.status = 'N' THEN

insert into elasticsearch_trash(id) values(OLD.id);

END IF;

-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。

IF NEW.status = 'Y' THEN

delete from elasticsearch_trash where id = OLD.id;

END IF;

END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW

BEGIN

-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。

insert into elasticsearch_trash(id) values(OLD.id);

END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;

import javax.persistence.Entity;

import javax.persistence.Id;

import javax.persistence.Table;

@Entity

@Table

public class ElasticsearchTrash {

@Id

private int id;

@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")

private Date ctime;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public Date getCtime() {

return ctime;

}

public void setCtime(Date ctime) {

this.ctime = ctime;

}

}

仓库

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository{

}

定时任务

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.rest.RestStatus;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component

public class ScheduledTasks {

private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

@Autowired

private TransportClient client;

@Autowired

private ElasticsearchTrashRepository alasticsearchTrashRepository;

public ScheduledTasks() {

}

@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务

public void cleanTrash() {

for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {

DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();

RestStatus status = response.status();

logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());

if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {

alasticsearchTrashRepository.delete(elasticsearchTrash);

}

}

}

}

Spring boot 启动主程序。

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication

@EnableScheduling

public class Application {

public static void main(String[] args) {

SpringApplication.run(Application.class, args);

}

}

相关推荐:

Elasticsearch是什么?Elasticsearch 能够被用在什么地方?

Elasticsearch索引和文档操作实例教程

详解spring中使用Elasticsearch的实例教程



推荐阅读
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • springmvc学习笔记(十):控制器业务方法中通过注解实现封装Javabean接收表单提交的数据
    本文介绍了在springmvc学习笔记系列的第十篇中,控制器的业务方法中如何通过注解实现封装Javabean来接收表单提交的数据。同时还讨论了当有多个注册表单且字段完全相同时,如何将其交给同一个控制器处理。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 学习Java异常处理之throws之抛出并捕获异常(9)
    任务描述本关任务:在main方法之外创建任意一个方法接收给定的两个字符串,把第二个字符串的长度减1生成一个整数值,输出第一个字符串长度是 ... [详细]
  • 获取时间的函数js代码,js获取时区代码
    本文目录一览:1、js获取服务器时间(动态)2 ... [详细]
  • 生产环境下JVM调优参数的设置实例
     正文前先来一波福利推荐: 福利一:百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。福利二 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 工作用可能会用到会话分组:Message是消息实体对象,里面有toId和fromId指明接收方ID和发送方Id,通过组合形式“12-22-”为map的key其中Mess ... [详细]
  • linux时间字符串转正常时间 ... [详细]
author-avatar
yukiyu227232
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有