Mongo DB Aggregation Pipeline

Aggregation Pipeline

Mongo DB의 Aggregation Framework는 데이터 처리 파이프라인의 개념을 모델로 합니다.
문서는 여러 단계의 파이프라인을 거쳐 변화하고 하나의 문서의 형태로 집계할 수 있습니다.

파이프라인(pipeline) 이란, 이전 단계의 연산결과를 다음 단계연산에 이용하는 것을 의미합니다.

aggregation-pipeline

Stages

Stages(db.collection.aggregate)

db.collection.aggregate 메서드는 파이프라인 단계를 Array의 형태로 나타냅니다.
Document는 파이프라인 Array의 순서대로 가공되며, $out$geoNear 를 제외한 모든 단계는 파이프라인에 여러 번 나타날 수 있습니다.

$Match

  • 조건에 만족하는 Document만 Filtering

입력 형식

1
{ $match: { <query> } }

예제

1
2
3
4
5
6
7
{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b257"), "author" : "ahn", "score" : 60, "views" : 1000 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b258"), "author" : "li", "score" : 55, "views" : 5000 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b259"), "author" : "annT", "score" : 60, "views" : 50 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25a"), "author" : "li", "score" : 94, "views" : 999 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25b"), "author" : "ty", "score" : 95, "views" : 1000 }

1
2
3
db.articles.aggregate(
[ { $match : { author : "dave" } } ]
);

연산 결과

1
2
{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }

$Group

  • Document에 대한 Grouping 연산을 수행
  • Group에 대한 id를 지정해야하고, 특정 필드에 대한 집계 연산이 가능
  • $group은 연산된 Document에 대한 정렬을 지원하지 않음

입력 형식

1
{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }

accumulator

Name Description
$addToSet id로 Grouping 한 데이터를 중복되지 않은 Set의 형태로 저장
$avg 숫자 값의 평균을 반환. 숫자가 아닌 값을 무시.
$first 각 그룹에 대한 첫 번째 Document의 값을 반환. <br />Document가 정의된 순서가 있는 경우에만 ordering을 지원
$last 각 그룹에 대한 마지막 Document의 값을 반환. <br />Document가 정의된 순서가 있는 경우에만 ordering을 지원
$max 각 그룹에서 가장 큰 값을 반환
$mergeObjects 각 그룹에 대한 입력 Document를 조합하여 작성한 Document를 반환합니다.
$min 각 그룹에서 가장 작은 값을 반환
$push 각 그룹의 필드 값의 배열을 반환
$stdDevPop 입력 값의 모집단 표준 편차를 반환
$stdDevSamp 입력 값의 샘플 표준 편차를 반환
$sum 각 그룹의 숫자형 데이터의 합을 반환. 숫자가 아닌 값은 무시

예시

1
2
3
4
5
{ "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-03-01T08:00:00Z") }
{ "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-03-01T09:00:00Z") }
{ "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-03-15T09:00:00Z") }
{ "_id" : 4, "item" : "xyz", "price" : 5, "quantity" : 20, "date" : ISODate("2014-04-04T11:21:39.736Z") }
{ "_id" : 5, "item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-04-04T21:23:13.331Z") }

연산 - 년/월/일을 기준으로 group

1
2
3
4
5
6
7
8
9
10
11
12
db.sales.aggregate(
[
{
$group : {
_id : { month: { $month: "$date" }, day: { $dayOfMonth: "$date" }, year: { $year: "$date" } },
totalPrice: { $sum: { $multiply: [ "$price", "$quantity" ] } },
averageQuantity: { $avg: "$quantity" },
count: { $sum: 1 }
}
}
]
)

  • 년/월/일을 기준으로 집계
  • $price x $quantity 를 곱한 값의 합을 totalPrice 필드로 지정
  • $quantity 필드 값의 평균을 averageQuantity 필드로 지정
  • Group별 데이터의 갯수를 count 로 지정

연산 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{ 
"_id" : { "month" : 3, "day" : 15, "year" : 2014 },
"totalPrice" : 50,
"averageQuantity" : 10,
"count" : 1
}
{
"_id" : { "month" : 4, "day" : 4, "year" : 2014 },
"totalPrice" : 200,
"averageQuantity" : 15,
"count" : 2
}
{
"_id" : { "month" : 3, "day" : 1, "year" : 2014 },
"totalPrice" : 40,
"averageQuantity" : 1.5,
"count" : 2
}

$Project

  • Project에서 지정한 필드 값을 다음 파이프라인 단계로 전달
  • RDB의 select 와 같은 역할
  • Field : 0 - 해당 필드 안보여줌
  • Filed : 1 - 해당 필드는 보여줌

입력형식

1
{ $project: { <specification(s)> } }

예제

1
2
3
4
5
6
7
{
"_id" : 1,
title: "abc123",
isbn: "0001122223334",
author: { last: "zzz", first: "aaa" },
copies: 5
}

title, author 필드만 표시

1
db.books.aggregate( [ { $project : { title : 1 , author : 1 } } ] )

연산 결과

1
{ "_id" : 1, "title" : "abc123", "author" : { "last" : "zzz", "first" : "aaa" } }

$Sort

  • 정렬 조건에 맞게 파이프라인의 연산결과를 정렬
  • ASC : 1, DESC : -1로 표현

입력 형식

1
{ $sort: { <field1>: <sort order>, <field2>: <sort order> ... } }

예제

1
2
3
4
5
6
{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 4, "subject" : "History", "score" : 71 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 6, "subject" : "History", "score" : 83 }

1
2
3
4
5
db.users.aggregate(
[
{ $sort : { score : -1} }
]
)

연산 결과

1
2
3
4
5
6
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 6, "subject" : "History", "score" : 83 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 4, "subject" : "History", "score" : 71 }

$Skip

  • 입력한 갯수만큼 차례대로 Document를 skip 한 데이터를 다음 파이프라인으로 전달

입력 형식

1
{ $skip: <positive integer> }

예제

1
2
3
db.article.aggregate(
{ $skip : 5 }
);

$Sample

  • Collection 내에서 입력한 갯수만큼 Random하게 Document 출력

입력 형식

1
{ $sample: { size: <positive integer> } }

예제

1
2
3
4
5
6
7
{ "_id" : 1, "name" : "dave123", "q1" : true, "q2" : true }
{ "_id" : 2, "name" : "dave2", "q1" : false, "q2" : false }
{ "_id" : 3, "name" : "ahn", "q1" : true, "q2" : true }
{ "_id" : 4, "name" : "li", "q1" : true, "q2" : false }
{ "_id" : 5, "name" : "annT", "q1" : false, "q2" : true }
{ "_id" : 6, "name" : "li", "q1" : true, "q2" : true }
{ "_id" : 7, "name" : "ty", "q1" : false, "q2" : true }

1
2
3
db.users.aggregate(
[ { $sample: { size: 3 } } ]
)

연산 결과

1
2
3
{ "_id" : 2, "name" : "dave2", "q1" : false, "q2" : false  }
{ "_id" : 4, "name" : "li", "q1" : true, "q2" : false }
{ "_id" : 7, "name" : "ty", "q1" : false, "q2" : true }

$Count

  • 스테이지에 입력하는 문서 수의 카운트가 포함된 문서를 다음 단계로 전달합니다.

입력 형식

1
{ $count: <string> }

예시 - 역사 점수 데이터가 있을때

1
2
3
4
5
6
{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 4, "subject" : "History", "score" : 71 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 6, "subject" : "History", "score" : 83 }

scores Collection에서 80점 초과인 데이터를 조회하고 그에 대한 Count를 계산

1
2
3
4
5
6
7
8
9
10
11
12
13
14
db.scores.aggregate(
[
{
$match: {
score: {
$gt: 80
}
}
},
{
$count: "passing_scores"
}
]
)

연산 결과

1
{ "passing_scores" : 4 }

$addField

  • Document에 새 필드를 추가합니다.
  • $addFields는 Document 및 새로 추가된 필드에서 모든 기존 필드가 포함된 문서를 출력합니다.
  • 실제 Document의 문서 내용을 바꾸는 것이 아닌 조회를 하는 용도

1
{ $addFields: { <newField>: <expression>, ... } }

예시 - score라는 collection에 아래와 같은 데이터가 있을때

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
_id: 1,
student: "Maya",
homework: [ 10, 5, 10 ],
quiz: [ 10, 8 ],
extraCredit: 0
}
{
_id: 2,
student: "Ryan",
homework: [ 5, 6, 5 ],
quiz: [ 8, 8 ],
extraCredit: 8
}

집계 연산을 하여, homework, quiz 필드에 대한 Array의 합을 기존 Document에 추가

1
2
3
4
5
6
7
8
9
10
11
12
db.scores.aggregate( [
{
$addFields: {
totalHomework: { $sum: "$homework" } ,
totalQuiz: { $sum: "$quiz" }
}
},
{
$addFields: { totalScore:
{ $add: [ "$totalHomework", "$totalQuiz", "$extraCredit" ] } }
}
] )

연산 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"_id" : 1,
"student" : "Maya",
"homework" : [ 10, 5, 10 ],
"quiz" : [ 10, 8 ],
"extraCredit" : 0,
"totalHomework" : 25,
"totalQuiz" : 18,
"totalScore" : 43
}
{
"_id" : 2,
"student" : "Ryan",
"homework" : [ 5, 6, 5 ],
"quiz" : [ 8, 8 ],
"extraCredit" : 8,
"totalHomework" : 16,
"totalQuiz" : 16,
"totalScore" : 40
}

$limit

  • 파이프라인 연산으로 출력된 Document의 갯수를 제한

입력 형식

1
{ $limit: <positive integer> }

예제

1
2
3
db.article.aggregate(
{ $limit : 5 }
);

$unwind

  • Document내의 배열 필드를 기반으로 각각의 Document로 분리

입력 형식

1
2
3
4
5
6
7
8
{
$unwind:
{
path: <field path>,
includeArrayIndex: <string>,
preserveNullAndEmptyArrays: <boolean>
}
}

Field Type Description
path string 배열 필드의 필드 경로.<br />필드 경로를 지정하려면 $ 기호를 사용하여 필드 이름에 접두사를 붙이고 따옴표로 묶습니다. (ex : "$arrayField")
includeArrayIndex string 요소의 배열 index값을 저장할 새 필드의 이름. <br />이름은 달러 기호 "$"로 시작할 수 없습니다. (필수 아님)
preserveNullAndEmptyArrays boolean 만약 true로 설정 시, path Field값이 null, 빈 배열인 경우에 $unwind 연산 결과가 Document에 표시 됨.<br />만약 false로 설정 시,$unwind 연산 결과가 Document에 표시 되지 않음. (default false) (필수 아님)

예제 1 - 기본 예제

1
{ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }

1
db.inventory.aggregate( [ { $unwind : "$sizes" } ] )

연산 결과 1

1
2
3
{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }

예제 2 - includeArrayIndex 속성을 이용한 Index 출력

1
2
3
4
5
{ "_id" : 1, "item" : "ABC", "sizes": [ "S", "M", "L"] }
{ "_id" : 2, "item" : "EFG", "sizes" : [ ] }
{ "_id" : 3, "item" : "IJK", "sizes": "M" }
{ "_id" : 4, "item" : "LMN" }
{ "_id" : 5, "item" : "XYZ", "sizes" : null }

1
2
3
db.inventory.aggregate( [ 
{ $unwind: { path: "$sizes", includeArrayIndex: "arrayIndex" } }
] )

연산 결과 2

1
2
3
4
{ "_id" : 1, "item" : "ABC", "sizes" : "S", "arrayIndex" : NumberLong(0) }
{ "_id" : 1, "item" : "ABC", "sizes" : "M", "arrayIndex" : NumberLong(1) }
{ "_id" : 1, "item" : "ABC", "sizes" : "L", "arrayIndex" : NumberLong(2) }
{ "_id" : 3, "item" : "IJK", "sizes" : "M", "arrayIndex" : null }

예제 3 - preserveNullAndEmptyArrays 속성을 이용한 출력

1
2
3
4
5
{ "_id" : 1, "item" : "ABC", "sizes": [ "S", "M", "L"] }
{ "_id" : 2, "item" : "EFG", "sizes" : [ ] }
{ "_id" : 3, "item" : "IJK", "sizes": "M" }
{ "_id" : 4, "item" : "LMN" }
{ "_id" : 5, "item" : "XYZ", "sizes" : null }

1
2
3
db.inventory.aggregate( [
{ $unwind: { path: "$sizes", preserveNullAndEmptyArrays: true } }
] )

연산 결과 3

1
2
3
4
5
6
{ "_id" : 1, "item" : "ABC", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC", "sizes" : "L" }
{ "_id" : 2, "item" : "EFG" }
{ "_id" : 3, "item" : "IJK", "sizes" : "M" }
{ "_id" : 4, "item" : "LMN" }

참고

  • https://docs.mongodb.com/master/core/aggregation-pipeline/