gomog/internal/engine/aggregate_batch3_test.go

461 lines
10 KiB
Go

package engine
import (
"testing"
"time"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// TestReplaceRoot 测试 $replaceRoot 阶段
func TestReplaceRoot(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
collection := "test.replace_root"
CreateTestCollectionForTesting(store, collection, map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{
"name": "John",
"profile": map[string]interface{}{
"age": 30,
"city": "New York",
},
}},
"doc2": {ID: "doc2", Data: map[string]interface{}{
"name": "Jane",
"profile": map[string]interface{}{
"age": 25,
"city": "London",
},
}},
})
docs, _ := store.GetAllDocuments(collection)
tests := []struct {
name string
spec interface{}
expected int
}{
{
name: "replace with nested object",
spec: map[string]interface{}{
"newRoot": "$profile",
},
expected: 2,
},
{
name: "replace with expression",
spec: map[string]interface{}{
"newRoot": map[string]interface{}{
"fullName": "$name",
},
},
expected: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := engine.executeReplaceRoot(tt.spec, docs)
if err != nil {
t.Fatalf("executeReplaceRoot() error = %v", err)
}
if len(result) != tt.expected {
t.Errorf("Expected %d documents, got %d", tt.expected, len(result))
}
})
}
}
// TestReplaceWith 测试 $replaceWith 阶段
func TestReplaceWith(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
collection := "test.replace_with"
CreateTestCollectionForTesting(store, collection, map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{
"x": float64(1),
"y": float64(2),
}},
})
docs, _ := store.GetAllDocuments(collection)
// $replaceWith 直接指定新的文档结构
spec := map[string]interface{}{
"sum": []interface{}{"$x", "$y"}, // 简化测试,不使用 $add
}
result, err := engine.executeReplaceWith(spec, docs)
if err != nil {
t.Fatalf("executeReplaceWith() error = %v", err)
}
if len(result) != 1 {
t.Errorf("Expected 1 document, got %d", len(result))
}
// 验证新文档包含 sum 字段
if _, ok := result[0].Data["sum"]; !ok {
t.Error("Expected sum field in result")
}
}
// TestGraphLookup 测试 $graphLookup 递归查找
func TestGraphLookup(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建组织结构数据
collection := "test.org"
CreateTestCollectionForTesting(store, collection, map[string]types.Document{
"ceo": {ID: "ceo", Data: map[string]interface{}{
"name": "CEO",
"reportsTo": nil,
}},
"manager1": {ID: "manager1", Data: map[string]interface{}{
"name": "Manager 1",
"reportsTo": "CEO",
}},
"employee1": {ID: "employee1", Data: map[string]interface{}{
"name": "Employee 1",
"reportsTo": "Manager 1",
}},
})
docs, _ := store.GetAllDocuments(collection)
spec := map[string]interface{}{
"from": "test.org",
"startWith": "$reportsTo",
"connectFromField": "name",
"connectToField": "reportsTo",
"as": "orgChart",
}
result, err := engine.executeGraphLookup(spec, docs)
if err != nil {
t.Fatalf("executeGraphLookup() error = %v", err)
}
if len(result) != 3 {
t.Errorf("Expected 3 documents, got %d", len(result))
}
// 检查 employee1 有组织架构图
for _, doc := range result {
if doc.ID == "employee1" {
orgChart, ok := doc.Data["orgChart"].([]map[string]interface{})
if !ok {
t.Error("orgChart should be an array")
}
if len(orgChart) == 0 {
t.Error("Expected orgChart to have data")
}
}
}
}
// TestSetWindowFields 测试 $setWindowFields 窗口函数
func TestSetWindowFields(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
collection := "test.window"
CreateTestCollectionForTesting(store, collection, map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{
"category": "A",
"score": float64(85),
}},
"doc2": {ID: "doc2", Data: map[string]interface{}{
"category": "A",
"score": float64(90),
}},
"doc3": {ID: "doc3", Data: map[string]interface{}{
"category": "B",
"score": float64(75),
}},
})
docs, _ := store.GetAllDocuments(collection)
spec := map[string]interface{}{
"partitionBy": "$category",
"sortBy": map[string]interface{}{
"score": float64(1),
},
"output": map[string]interface{}{
"rank": map[string]interface{}{
"$documentNumber": nil,
},
},
}
result, err := engine.executeSetWindowFields(spec, docs)
if err != nil {
t.Fatalf("executeSetWindowFields() error = %v", err)
}
if len(result) != 3 {
t.Errorf("Expected 3 documents, got %d", len(result))
}
// 检查是否有 rank 字段
hasRank := false
for _, doc := range result {
if _, ok := doc.Data["rank"]; ok {
hasRank = true
break
}
}
if !hasRank {
t.Error("Expected documents to have rank field")
}
}
// TestWeekOperators 测试日期周操作符
func TestWeekOperators(t *testing.T) {
engine := &AggregationEngine{}
testDate := time.Date(2024, 3, 15, 0, 0, 0, 0, time.UTC)
data := map[string]interface{}{
"date": testDate,
}
// 测试 $week
week := engine.week("$date", data)
if week == 0 {
t.Error("$week returned 0")
}
// 测试 $isoWeek
isoWeek := engine.isoWeek("$date", data)
if isoWeek == 0 {
t.Error("$isoWeek returned 0")
}
// 测试 $dayOfYear
dayOfYear := engine.dayOfYear("$date", data)
if dayOfYear == 0 {
t.Error("$dayOfYear returned 0")
}
// 测试 $isoDayOfWeek
isoDayOfWeek := engine.isoDayOfWeek("$date", data)
if isoDayOfWeek == 0 {
t.Error("$isoDayOfWeek returned 0")
}
}
// TestNow 测试 $now 操作符
func TestNow(t *testing.T) {
engine := &AggregationEngine{}
now := engine.now()
if now.IsZero() {
t.Error("$now returned zero time")
}
nowStr := now.Format(time.RFC3339)
// 验证格式
_, err := time.Parse(time.RFC3339, nowStr)
if err != nil {
t.Errorf("$now returned invalid format: %v", err)
}
}
// TestDateToString 测试 $dateToString 日期格式化
func TestDateToString(t *testing.T) {
engine := &AggregationEngine{}
testDate := time.Date(2024, 3, 15, 14, 30, 45, 0, time.UTC)
data := map[string]interface{}{
"date": testDate,
}
tests := []struct {
format string
expected string
}{
{"%Y-%m-%d", "2024-03-15"},
{"%Y/%m/%d", "2024/03/15"},
{"%d/%m/%Y", "15/03/2024"},
{"%Y", "2024"},
{"%m", "03"},
{"%d", "15"},
}
for _, tt := range tests {
spec := map[string]interface{}{
"date": "$date",
"format": tt.format,
}
result := engine.dateToString(spec, data)
if result != tt.expected {
t.Errorf("dateToString(%s) = %v, want %v", tt.format, result, tt.expected)
}
}
}
// TestTextSearch 测试文本搜索
func TestTextSearch(t *testing.T) {
engine := &AggregationEngine{}
docs := []types.Document{
{
ID: "doc1",
Data: map[string]interface{}{
"title": "Introduction to Go Programming",
"content": "Go is a programming language",
},
},
{
ID: "doc2",
Data: map[string]interface{}{
"title": "Python Basics",
"content": "Python is easy to learn",
},
},
{
ID: "doc3",
Data: map[string]interface{}{
"title": "Advanced Go Techniques",
"content": "Learn advanced Go patterns",
},
},
}
// 搜索 "Go"
results, err := engine.executeTextSearch(docs, "Go", "", false)
if err != nil {
t.Fatalf("executeTextSearch() error = %v", err)
}
if len(results) != 2 {
t.Errorf("Expected 2 results for 'Go', got %d", len(results))
}
// 第一个结果应该是得分最高的
if len(results) > 0 {
score, ok := results[0].Data["_textScore"].(float64)
if !ok {
t.Error("Expected _textScore field")
}
if score <= 0 {
t.Error("Expected positive score")
}
}
}
// TestCalculateTextScore 测试文本得分计算
func TestCalculateTextScore(t *testing.T) {
engine := &AggregationEngine{}
doc := map[string]interface{}{
"title": "Go Programming Guide",
"content": "This guide covers Go programming language",
"tags": []interface{}{"go", "programming", "guide"},
}
searchTerms := []string{"go", "programming"}
score := engine.calculateTextScore(doc, searchTerms, false)
if score <= 0 {
t.Errorf("Expected positive score, got %f", score)
}
}
// TestAggregateBatch3Integration 集成测试
func TestAggregateBatch3Integration(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建测试数据
collection := "test.batch3_integration"
CreateTestCollectionForTesting(store, collection, map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{
"name": "Alice",
"profile": map[string]interface{}{
"age": 30,
"city": "NYC",
},
"score": float64(85),
"date": "2024-03-15T00:00:00Z",
}},
"doc2": {ID: "doc2", Data: map[string]interface{}{
"name": "Bob",
"profile": map[string]interface{}{
"age": 25,
"city": "LA",
},
"score": float64(90),
"date": "2024-03-16T00:00:00Z",
}},
})
tests := []struct {
name string
pipeline []types.AggregateStage
checkFn func([]types.Document) bool
}{
{
name: "replaceRoot pipeline",
pipeline: []types.AggregateStage{
{Stage: "$replaceRoot", Spec: map[string]interface{}{
"newRoot": "$profile",
}},
},
checkFn: func(docs []types.Document) bool {
for _, doc := range docs {
if _, ok := doc.Data["age"]; !ok {
return false
}
}
return true
},
},
{
name: "addFields with date operators",
pipeline: []types.AggregateStage{
{Stage: "$addFields", Spec: map[string]interface{}{
"week": map[string]interface{}{"$week": "$date"},
"dayOfYear": map[string]interface{}{"$dayOfYear": "$date"},
"formatted": map[string]interface{}{"$dateToString": map[string]interface{}{"date": "$date", "format": "%Y-%m-%d"}},
}},
},
checkFn: func(docs []types.Document) bool {
for _, doc := range docs {
if doc.Data["week"] == nil {
return false
}
if doc.Data["dayOfYear"] == nil {
return false
}
if doc.Data["formatted"] == nil {
return false
}
}
return true
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
results, err := engine.Execute(collection, tt.pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
if !tt.checkFn(results) {
t.Errorf("Pipeline check failed for: %s", tt.name)
}
})
}
}