热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

关于elasticsearch的问题解决记录

最近有这样一个需求,需要修改一个字段的mapping和要添加一个字段,新增字段和老的字段value要一样,也就是要复制一个字段的值到这个新增字段上来,保

最近有这样一个需求,需要修改一个字段的mapping和要添加一个字段,新增字段和老的字段value要一样,也就是要复制一个字段的值到这个新增字段上来,保持两个内容一致,新增字段做不分词处理,用来精确匹配搜索。但elasticsearch的mapping是不能修改的,所以只好新建一个索引,重新定义mapping之后再导入数据。数据量比较大,有1500W+数据。
之前有人写过一个导入数据的程序。代码如下:

首先创建一个工厂类ClientFactory.class:

package com.dimonho.es.commons;

import java.lang.reflect.Constructor;

import org.apache.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

/**
* ES客户端工厂类
*/

public class ClientFactory{

private static final Logger LOGGER = Logger.getLogger(ClientFactory.class);

/**
* ES请求地址
*/

private String address;

/**
* ES集群名字
*/

private String clusterName;

private TransportClient client;

public static final int DEFAULT_ES_PORT=9300;

/**
* 初始化方法,使用配置的参数来构建一个客户端
*/

public void init(){
LOGGER.info(String.format("初始化ES链接:%s(%s)",address,clusterName));
Settings defaultSettings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false)
.put("client.transport.ignore_cluster_name", true)
.put("index.similarity.default.type", "default")
.put("cluster.name",clusterName)
.build();
try {
Class clazz = Class.forName(TransportClient.class.getName());
Constructor cOnstructor= clazz.getDeclaredConstructor(Settings.class);
constructor.setAccessible(true);
client = (TransportClient) constructor.newInstance(defaultSettings);
String[] addrs = address.split(",");
for(String str : addrs){
String[] items = str.split(":");
if(items.length==2){
client.addTransportAddress(new InetSocketTransportAddress(items[0], Integer.valueOf(items[1])));
}else if(items.length ==1){
client.addTransportAddress(new InetSocketTransportAddress(items[0], DEFAULT_ES_PORT));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* 取得实例
* @return
*/

public Client getTransportClient() {
return client;
}

public Client getTransportClient(String clusterName,String ip,Integer port){
return createClient(clusterName,ip,port);
}

/**
* 创建一个客户端
* @param clusterName
* @param ip
* @param port
* @return
*/

public Client createClient(String clusterName,String ip,Integer port){
Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false)
.put("client.transport.ignore_cluster_name", true)
.put("index.similarity.default.type", "default")
.put("cluster.name",clusterName)
.build();
return createClient(settings,ip,port);
}

public Client createClient(Settings settings,String ip,Integer port){
TransportClient client = new TransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(ip,port));
return client;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

}

封装一些查询方法类ScanQuery.class

package com.dimonho.es.commons;

import java.util.Iterator;

import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;

public class ScanQuery {

public class BulkInterator implements Iterator<SearchHit>{

private SearchHit[] hits;

private String scrollId ;

/**当前scroll的数据量*/
private int count =0;

/**当前scroll的指针*/
private int index =0 ;

/**查询总共的数据量*/
private long total = 0 ;

/**统计的数据总量*/
private long counter = 0;

private TimeValue time;

private Client client;

public BulkInterator(Client client ,String scrollId,long total,TimeValue time){
this.client = client;
this.scrollId =scrollId;
this.total = total;
this.time = time;
}

@Override
public boolean hasNext() {
if(count == index){
SearchResponse searchRespOnse= client.prepareSearchScroll(scrollId)
.setScroll(time)
.execute().actionGet();
count = searchResponse.getHits().hits().length;
if(count >0){//还有数据
hits = searchResponse.getHits().hits();
index =0 ;
return true;
}
}else if(index return true;
}
return false;
}

@Override
public SearchHit next() {
counter++;
return hits[index++];
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

public long getTotal() {
return total;
}

public long getCounter() {
return counter;
}

}

public Iterator query(Client client,String indexName){
return query(client,indexName,null,null,null);
}

public Iterator query(Client client,String indexName,String[] types){
return query(client,indexName,types,null,null);
}

public Iterator query(Client client,String indexName,String[] types,QueryBuilder queryBuilder,FilterBuilder filterBuilder){
return query(client,indexName,types,queryBuilder,filterBuilder,TimeValue.timeValueSeconds(1),TimeValue.timeValueMinutes(8));
}

public Iterator query(Client client,String indexName,String[] types,QueryBuilder queryBuilder,FilterBuilder filterBuilder,TimeValue scanTime,TimeValue scollTime ){
SearchResponse searchRespOnse= null;
SearchRequestBuilder builder = client.prepareSearch(indexName ).setSize(100);
if(queryBuilder!=null){
builder.setQuery(queryBuilder);
}
if(filterBuilder != null){
builder.setPostFilter(filterBuilder);
}
if(types != null && types.length > 0){
builder.setTypes(types);
}
builder.setSearchType(SearchType.SCAN)
.setScroll(scanTime);
searchRespOnse= builder.execute().actionGet();
return new BulkInterator(client,searchResponse.getScrollId(),searchResponse.getHits().getTotalHits(),scollTime);
}
}

批量插入处理工具类BulkUtils.class

package com.dimonho.es.utils;

import java.util.List;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;

/**
* 批量插入处理类
*
*/

public class BulkUtils {

public static abstract class AbstractBulkListener implements BulkProcessor.Listener{

protected int total;

@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
total += request.requests().size();
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

}

public int getTotal() {
return total;
}

public void setTotal(int total) {
this.total = total;
}

}

public static BulkProcessor buildBulk(Client client,int requestSize,AbstractBulkListener listener){
return BulkProcessor.builder(client, listener).setConcurrentRequests(requestSize).build();
}

public static void insert(Client client,BulkProcessor processor,String indexName,String type,Map source){
IndexRequestBuilder indexRequestBuilder = client
.prepareIndex(indexName,type).setOpType(IndexRequest.OpType.INDEX);
if (source!=null && null != source.get("_id")) {
String id = (String)source.remove("_id");
indexRequestBuilder.setId(id);
}
indexRequestBuilder.setSource(source);
indexRequestBuilder.setOpType(IndexRequest.OpType.CREATE);
processor.add(indexRequestBuilder.request());
}

public static void insert(Client client,BulkProcessor processor,String indexName,String type,List> sources){
for(Map source : sources){
insert(client,processor,indexName,type,source);
}
}
}

数据复制测试类BulkUtilsTest.class

package com.dimonho.es.commons;

import java.util.Date;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.dimonho.es.utils.BulkUtils;
import com.dimonho.es.utils.BulkUtils.AbstractBulkListener;
import com.dimonho.es.commons.ScanQuery.BulkInterator;

/**
* 批量插入测试
*/

public class BulkUtilsTest {

//目标索引
private String targetIndex = "wos_source";

//源索引
private String sourceIndex = "wos_source";

private Client sourceClient;

private Client targetClient;

@Before
public void before(){
ClientFactory clientFactory = new ClientFactory();
sourceClient= clientFactory.getTransportClient("our_es", "192.168.1.75", 9300);
targetClient = clientFactory.getTransportClient("our_es", "www.dimonho.com", 9300);

}

@After
public void after(){
//IndexUtils.deleteIndex(targetClient, targetIndex);
}

@Test
public void testBulkInsert(){

ScanQuery query = new ScanQuery();
BulkInterator ite = (BulkInterator)query.query(sourceClient, sourceIndex,new String[]{"periodical"},null,null);
System.out.println("共查询到数据:"+ite.getTotal()+"条");
BulkProcessor processor = BulkUtils.buildBulk(targetClient, 2000, new AbstractBulkListener(){

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
total += request.requests().size();
String dateStr = (new Date()).toString();
System.out.println(" |-已导入[" + total + "]条数据! - " + dateStr + " - index:["
+ targetIndex + "] - type:[periodical]");
if(response.hasFailures()){
for(BulkItemResponse item : response.getItems()){
if(item.isFailed()){
System.out.println("失败信息:[periodical],id:["+item.getId()+"]"+item.getFailureMessage());
}
}
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("\r\n").append(">>>>>>>>>>>>>>>><<<<<<<<<<<<<<<");
stringBuilder.append("\r\nbulk操作异常,异常信息: ").append("\r\n")
.append(failure.getMessage()).append("\r\n")
.append(failure.getLocalizedMessage()).append("\r\n")
.append(failure.getCause());
stringBuilder.append("\r\n请求数:").append(request.numberOfActions());
stringBuilder.append("\r\n").append(">>>>>>>>>>>>>>>><<<<<<<<<<<<<<<")
.append("\r\n");
System.out.println(stringBuilder.toString());
System.out.println("处理出错:"+failure);
}


});
Map result = null;
while(ite.hasNext()){
result = ite.next().getSource();
//复制jornal字段的值到新加的字段jornalName
result.put("jornalName", result.get("jornal"));
BulkUtils.insert(sourceClient, processor, targetIndex, "periodical", result);
if(ite.getCounter()%2000 == 0L){
System.out.println(ite.getCounter());
}
if(ite.getCounter() == ite.getTotal()){
break;
}
}
}

}

但是当用junit执行testBulkInsert()方法进行数据复制的时候。。。
no node available

回到ES服务器去查看日志:
超时

因为是远程复制,源和目标index不在同一个局域网,有时候网络不佳,客户端和服务端连接超时的现象会报“no node avaliable”的错误。于是我把超时时间设置的长一点,从之前的20S改成了60S,依然报错,只不过这次是GC错误,我想应该是多线程一边不停的往BulkRequest中添加数据,一边execute的又比较慢,导致BulkRequest中积压的数据越来越多,最后JVM内存不够用了。

最终只好自己重新写了一个,代码如下:

package com.dimonho.es.commons;

import java.util.Map;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.Before;
import org.junit.Test;

/**
* 数据迁移测试
*
*/

public class MoveDataTest {

private String sourceClusterName = "our_es";

private String targetClusterName = "our_es";
//源索引
private String sourceIndex = "wos_source";
//目标索引
private String targetIndex = "wos_source";

private String sourceType = "periodical";

private String targetType = "periodical";

private Client sourceClient;

private Client targetClient;

private static int errcount = 0;

@Before
public void before(){
ClientFactory clientFactory = new ClientFactory();
sourceClient = clientFactory.getTransportClient(sourceClusterName, "192.168.1.75", 9300);
targetClient = clientFactory.getTransportClient(targetClusterName, "www.dimonho.com", 9300);

}

@Test
public void bulkMove(){
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
SearchResponse sourceDatas = sourceClient.prepareSearch(sourceIndex)
.setTypes(sourceType)
.setQuery(queryBuilder)
.setSize(2000)
.setScroll(TimeValue.timeValueSeconds(2))
.setSearchType(SearchType.SCAN)
.execute()
.actionGet();
long sumCount = sourceDatas.getHits().getTotalHits();
System.out.println("========共查询出"+sumCount+"条数据===========");

String scrollId = sourceDatas.getScrollId();
System.out.println("scrollId:"+scrollId);
BulkRequestBuilder bulkRequest = getBulkRequestBuilder();

while(true){
SearchResponse scrollRespOnse= sourceClient.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMinutes(8)).execute().actionGet();
SearchHit[] hits = scrollResponse.getHits().hits();
int count = hits.length;
sumCount -= count;
if(count == 0){
break;
}
for(SearchHit hit:hits){
Map source = hit.getSource();
source.put("journalName", hit.getSource().get("journal"));
IndexRequestBuilder indexRequesBuider = getIindexRequesBuider();
if (source!=null && null != source.get("_id")) {
String id = (String)source.remove("_id");

indexRequesBuider.setId(id);
}
indexRequesBuider.setOpType(IndexRequest.OpType.CREATE);
bulkRequest.add(indexRequesBuider.setSource(source));
}
insert(bulkRequest);

bulkRequest = getBulkRequestBuilder();
System.out.println("还剩"+sumCount+"条数据待导入。。。。。。。。。。。。。。。");

}

}

private void insert(BulkRequestBuilder bulkRequest) {
try{
//如果中途失败,bulkRequest可能还有值未完全插入,需要检测是否有值再重新执行插入。
if (bulkRequest.request().requests().size() != 0){
//BulkResponse接收失败信息
BulkResponse bulkRespOnse= bulkRequest.execute().actionGet();
if(bulkResponse.hasFailures()){
int succes = 0;
for(BulkItemResponse item : bulkResponse.getItems()){
if(item.isFailed()){
errcount++;
}else{
succes++;
}
}
System.out.println();
System.out.println("失败"+errcount+"条\n"+bulkRequest.request().requests().size()+"》》成功:"+succes+"条");
}
}
}catch(Exception e){
e.printStackTrace();
System.out.println("连接目标服务器失败!重新连接。。。。");
try {
Thread.sleep(10000);
insert(bulkRequest);//防止出现链接超时现象,自动重新运行。
} catch (InterruptedException e1) {
e1.printStackTrace();
insert(bulkRequest);//防止出现链接超时现象,自动重新运行。
}

}
}

public IndexRequestBuilder getIindexRequesBuider(){
try{
return targetClient.prepareIndex(targetIndex, targetType);
}catch(Exception e){
e.printStackTrace();
System.out.println("获取IindexRequesBuider实例失败,重新获取。。。。。");
try {
Thread.sleep(10000);
return getIindexRequesBuider();
} catch (InterruptedException e1) {
e1.printStackTrace();
return getIindexRequesBuider();
}

}
}

public BulkRequestBuilder getBulkRequestBuilder(){
try {
return targetClient.prepareBulk();
} catch (Exception e) {
e.printStackTrace();
System.out.println("获取BulkRequestBuilder实例失败,重新获取。。。。。");
try {
Thread.sleep(10000);
return getBulkRequestBuilder();
} catch (InterruptedException e1) {
e1.printStackTrace();
return getBulkRequestBuilder();
}
}
}
}

这样虽然比多线程慢了一点,不过如果连接超时它会自动再去尝试连接,可以无人值守放在那运行一晚上了。


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • log4j相关
    Log4j的类图Logger-日志写出器,供程序员输出日志信息Appender-日志目的地,把格式化好的日志信息输出到指定的地方去ConsoleAppe ... [详细]
  • import?java.io.IOException;import?java.io.InputStream;import?java.util.Properties;impor ... [详细]
  • 开发笔记:MyBatis03:ResultMap及分页
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了MyBatis03:ResultMap及分页相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • React项目中运用React技巧解决实际问题的总结
    本文总结了在React项目中如何运用React技巧解决一些实际问题,包括取消请求和页面卸载的关联,利用useEffect和AbortController等技术实现请求的取消。文章中的代码是简化后的例子,但思想是相通的。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 我收到这个错误.我怎么能在我的情况下解决这个问题?Bitmapcannotberesolvedtoatype发生错误的行publicvoidonPageStart ... [详细]
  • Spring入门第十讲——Hibernate5.0.7+Struts2.3.24+Spring4.2.4三大框架整合开发
    回顾SSH框架至此,Hibernate-5.0.7、Struts-2.3.24和Spring-4.2.4这三个框架,我们已经都过了一遍了。现在,咱就要 ... [详细]
  • java日志框架详解
    Java日志框架详解1.常用日志框架1.1Java常用日志框架类别1.2Java常用日志框架历史1.3两大日志接口阵营1.3.1基于CommonsLogging接口实现的常用日志框 ... [详细]
  • -1{baos.write;baos.flush(;}responseDatabaos.toByteArray(;}catch{thrownewIO ... [详细]
author-avatar
乌龟考拉互受
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有