2016-09-05 8 views
0

elastic4s DSL을 사용하여 다중 레벨 집계 쿼리를 동적으로 수행 할 수 있습니까? 방법이 사용 elastic4s 또는 Java 클라이언트를 수행하는 HTTP의elastic4s 클라이언트를 사용하여 elasticsearch에 대한 동적 집계 쿼리 작성

 
multiLevelAggregation 

Input: Fields[0..N] 
Output: Data grouped by field tuple 

Steps: 
1. Build multilevel elasticsearch aggregation query (JSON) 
2. Execute query on elasticsearch server 
3. Flatten result and return 

의 직접적인 클라이언트하지만 사용

.

답변

1

내 문제를 신중하게 이해하면 처음에는 이것이 elastic4s의 제한 사항이라고 생각했지만 처음에는 elastic4s 클라이언트를 통해 다중 필드 집계 쿼리를 동적으로 구축하기가 쉽지 않다고 생각했습니다. 내 솔루션은 입니다.

//For building aggregation query 
def buildAgg(groups: Seq[String])(leafAggBuilder:() => AbstractAggregationDefinition): AbstractAggregationDefinition = { 
    groups match { 
    case x :: xs => aggregation.terms("termAgg").field(x).aggregations(buildAgg(xs)(leafAggBuilder)) 
    case Nil => leafAggBuilder() 
    } 
} 

//An example leaf aggregation builder 
def buildLeafAgg(aggFuncInfo: Pair[String, String])(): AbstractAggregationDefinition = { 
    aggFuncInfo._1 match { 
    case "avg" => aggregation.avg("aggFunc").field(aggFuncInfo._2) 
    case "sum" => aggregation.sum("aggFunc").field(aggFuncInfo._2) 
    case "cardinality" => aggregation.cardinality("aggFunc").field(aggFuncInfo._2) 
    case _ => aggregation.count("aggFunc").field(aggFuncInfo._2) 
    } 
} 

//For parsing aggregation 
def parseAgg[T](groups: Seq[String], agg: Aggregation, allGroups: Seq[String])(leafAggParser: (Seq[String], Aggregation) => Seq[T]): Seq[T] = { 
    groups match { 
    case x :: xs => { 
     val groupAggs = agg.asInstanceOf[StringTerms].getBuckets.asScala.toList 
     (for { 
     groupAgg <- groupAggs 
     aa = groupAgg.getAggregations.asList.asScala.head 
     gkey = groupAgg.getKeyAsString 
     gacc = allGroups :+ gkey 
     } yield parseAgg(xs, aa, gacc)(leafAggParser)).flatten 
    } 

    case Nil => { 
     leafAggParser(allGroups, agg) 
    } 
    } 
} 

//An example leaf aggregation parser 
def parseSimpleLeafAgg(allGroups: Seq[String], agg: Aggregation): Seq[GroupStats] = { 
    val value = agg.asInstanceOf[InternalNumericMetricsAggregation.SingleValue].value() 
    val groupId = allGroups.mkString(".") 
    Seq(GroupStats(groupId, value)) 
} 

//Usage: Build Query and Parse result 
def groupStats(groupFields: Seq[String]): Seq[GroupStats] = { 
    val resp = client.execute { 
    def leafPlainAggBuilder = buildLeafAgg(("count", "V1")) _ 
    search(esIndex).size(0).aggregations(buildAgg(groupFields)(leafPlainAggBuilder)) 
    }.await 
    //get the root aggregation 
    val agg = resp.aggregations.asList().asScala.head 
    def leafAggParser = parseSimpleLeafAgg _ 
    val res = parseAgg(groupFields, agg, Seq())(leafAggParser) 
    res 
}