我正在寻找在elasticsearch中分组数据的最佳方法.Elasticsearch不支持sql中的'group by'之类的东西.
假设我有1k类别和数百万种产品.您认为渲染完整类别树的最佳方式是什么?of couse jou需要一些元数据(图标,链接目标,seo-titles,...)和类别的自定义排序.
使用聚合:示例:https://found.no/play/gist/8124563 看起来可用,如果您必须按一个字段分组,并需要一些额外的字段.
在Facet中使用多个字段(不会工作)示例:https://found.no/play/gist/1aa44e2114975384a7c2 这里我们失去了不同字段之间的关系.
建立有趣的方面 https://found.no/play/gist/8124810
例如使用这3个"解决方案"构建类别树很糟糕.解决方案1可能工作(ES 1现在不稳定)解决方案2不起作用解决方案3是痛苦的,因为它感觉很难看,你需要准备大量数据并且方面爆炸.
可能替代方案可能不是在ES中存储任何类别数据,只是id https://found.no/play/gist/a53e46c91e2bf077f2e1
你可以从另一个系统获得关联的类别,比如redis,memcache或数据库.
这将最终成为干净的代码,但性能可能会成为一个问题.例如从memcache/Redis /数据库加载1k类别可能很慢.另一个问题是同步2个数据库比同步一个更困难.
你怎么处理这样的问题?
我很抱歉链接,但我不能在一篇文章中发布超过2个.
聚合API允许使用子聚合按多个字段进行分组.假设您要按字段分组field1
,field2
并且field3
:
{
"aggs": {
"agg1": {
"terms": {
"field": "field1"
},
"aggs": {
"agg2": {
"terms": {
"field": "field2"
},
"aggs": {
"agg3": {
"terms": {
"field": "field3"
}
}
}
}
}
}
}
}
当然,这可以继续你想要的许多领域.
更新:
为了完整性,以下是上述查询的输出的外观.下面是python代码,用于生成聚合查询并将结果展平为字典列表.
{
"aggregations": {
"agg1": {
"buckets": [{
"doc_count": <count>,
"key": <value of field1>,
"agg2": {
"buckets": [{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field1>,
"agg2": {
"buckets": [{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
}, ...
]
}, ...
]
}
}
}
以下python代码在给定字段列表的情况下执行group-by.我指定include_missing=True
,它也包括一些字段的缺失值的组合(你不需要它,如果你的版本是2.0 Elasticsearch感谢给这个)
def group_by(es, fields, include_missing):
current_level_terms = {'terms': {'field': fields[0]}}
agg_spec = {fields[0]: current_level_terms}
if include_missing:
current_level_missing = {'missing': {'field': fields[0]}}
agg_spec[fields[0] + '_missing'] = current_level_missing
for field in fields[1:]:
next_level_terms = {'terms': {'field': field}}
current_level_terms['aggs'] = {
field: next_level_terms,
}
if include_missing:
next_level_missing = {'missing': {'field': field}}
current_level_terms['aggs'][field + '_missing'] = next_level_missing
current_level_missing['aggs'] = {
field: next_level_terms,
field + '_missing': next_level_missing,
}
current_level_missing = next_level_missing
current_level_terms = next_level_terms
agg_result = es.search(body={'aggs': agg_spec})['aggregations']
return get_docs_from_agg_result(agg_result, fields, include_missing)
def get_docs_from_agg_result(agg_result, fields, include_missing):
current_field = fields[0]
buckets = agg_result[current_field]['buckets']
if include_missing:
buckets.append(agg_result[(current_field + '_missing')])
if len(fields) == 1:
return [
{
current_field: bucket.get('key'),
'doc_count': bucket['doc_count'],
}
for bucket in buckets if bucket['doc_count'] > 0
]
result = []
for bucket in buckets:
records = get_docs_from_agg_result(bucket, fields[1:], include_missing)
value = bucket.get('key')
for record in records:
record[current_field] = value
result.extend(records)
return result