gomog/internal/engine/aggregate_batch5_test.go

376 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"testing"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
func TestUnionWith_Simple(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建两个测试集合
CreateTestCollectionForTesting(store, "orders2023", map[string]types.Document{
"order1": {ID: "order1", Data: map[string]interface{}{"year": float64(2023), "amount": float64(100)}},
"order2": {ID: "order2", Data: map[string]interface{}{"year": float64(2023), "amount": float64(150)}},
})
CreateTestCollectionForTesting(store, "orders2024", map[string]types.Document{
"order3": {ID: "order3", Data: map[string]interface{}{"year": float64(2024), "amount": float64(200)}},
})
// 执行 union简写形式
pipeline := []types.AggregateStage{
{Stage: "$match", Spec: map[string]interface{}{"year": float64(2023)}},
{Stage: "$unionWith", Spec: "orders2024"},
}
results, err := engine.Execute("orders2023", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 应该返回 3 个文档1 个来自 2023 + 2 个来自 2024
if len(results) != 3 {
t.Errorf("Expected 3 results, got %d", len(results))
}
}
func TestUnionWith_Pipeline(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建两个测试集合
CreateTestCollectionForTesting(store, "sales_q1", map[string]types.Document{
"s1": {ID: "s1", Data: map[string]interface{}{"quarter": "Q1", "amount": float64(100)}},
})
CreateTestCollectionForTesting(store, "sales_q2", map[string]types.Document{
"s2": {ID: "s2", Data: map[string]interface{}{"quarter": "Q2", "amount": float64(200)}},
"s3": {ID: "s3", Data: map[string]interface{}{"quarter": "Q2", "amount": float64(50)}},
})
// 执行 union 带 pipeline
pipeline := []types.AggregateStage{
{Stage: "$unionWith", Spec: map[string]interface{}{
"coll": "sales_q2",
"pipeline": []interface{}{
map[string]interface{}{
"$match": map[string]interface{}{
"amount": map[string]interface{}{"$gt": float64(100)},
},
},
},
}},
}
results, err := engine.Execute("sales_q1", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 应该返回 2 个文档1 个来自 Q1 + 1 个过滤后的 Q2
if len(results) != 2 {
t.Errorf("Expected 2 results, got %d", len(results))
}
}
func TestRedact_Keep(t *testing.T) {
engine := &AggregationEngine{}
data := map[string]interface{}{
"_id": float64(1),
"name": "Alice",
"level": float64(5),
}
spec := map[string]interface{}{
"$cond": map[string]interface{}{
"if": map[string]interface{}{
"$gte": []interface{}{"$level", float64(5)},
},
"then": "$$KEEP",
"else": "$$PRUNE",
},
}
docs := []types.Document{{ID: "1", Data: data}}
results, err := engine.executeRedact(spec, docs)
if err != nil {
t.Fatalf("executeRedact() error = %v", err)
}
if len(results) != 1 {
t.Errorf("Expected 1 result, got %d", len(results))
}
}
func TestRedact_Prune(t *testing.T) {
engine := &AggregationEngine{}
data := map[string]interface{}{
"_id": float64(1),
"name": "Bob",
"level": float64(2),
}
spec := map[string]interface{}{
"$cond": map[string]interface{}{
"if": map[string]interface{}{
"$gte": []interface{}{"$level", float64(5)},
},
"then": "$$KEEP",
"else": "$$PRUNE",
},
}
docs := []types.Document{{ID: "2", Data: data}}
results, err := engine.executeRedact(spec, docs)
if err != nil {
t.Fatalf("executeRedact() error = %v", err)
}
if len(results) != 0 {
t.Errorf("Expected 0 results (pruned), got %d", len(results))
}
}
func TestOut_Simple(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
CreateTestCollectionForTesting(store, "source", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
"doc2": {ID: "doc2", Data: map[string]interface{}{"value": float64(2)}},
})
pipeline := []types.AggregateStage{
{Stage: "$out", Spec: "output"},
}
results, err := engine.Execute("source", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 验证输出集合已创建
outputDocs, err := store.GetAllDocuments("output")
if err != nil {
t.Fatalf("GetAllDocuments() error = %v", err)
}
if len(outputDocs) != 2 {
t.Errorf("Expected 2 documents in output, got %d", len(outputDocs))
}
// 验证返回的确认文档
if len(results) != 1 {
t.Errorf("Expected 1 result document, got %d", len(results))
}
if results[0].Data["ok"] != float64(1) {
t.Errorf("Expected ok=1, got %v", results[0].Data["ok"])
}
if results[0].Data["nInserted"] != float64(2) {
t.Errorf("Expected nInserted=2, got %v", results[0].Data["nInserted"])
}
}
func TestMerge_Insert(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
CreateTestCollectionForTesting(store, "source", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
})
// 目标集合不存在,应该插入
pipeline := []types.AggregateStage{
{Stage: "$merge", Spec: map[string]interface{}{
"into": "target",
}},
}
results, err := engine.Execute("source", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 验证目标集合已创建并插入文档
targetDocs, err := store.GetAllDocuments("target")
if err != nil {
t.Fatalf("GetAllDocuments() error = %v", err)
}
if len(targetDocs) != 1 {
t.Errorf("Expected 1 document in target, got %d", len(targetDocs))
}
// 验证统计信息
if len(results) != 1 {
t.Errorf("Expected 1 result document, got %d", len(results))
}
stats := results[0].Data
if stats["nInserted"] != float64(1) {
t.Errorf("Expected nInserted=1, got %v", stats["nInserted"])
}
}
func TestMerge_Update(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建源集合和目标集合
CreateTestCollectionForTesting(store, "source", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(100), "updated": true}},
})
CreateTestCollectionForTesting(store, "target", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1), "name": "original"}},
})
// 使用 replace 策略更新
pipeline := []types.AggregateStage{
{Stage: "$merge", Spec: map[string]interface{}{
"into": "target",
"whenMatched": "replace",
}},
}
results, err := engine.Execute("source", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 验证目标集合已更新
targetDocs, err := store.GetAllDocuments("target")
if err != nil {
t.Fatalf("GetAllDocuments() error = %v", err)
}
if len(targetDocs) != 1 {
t.Errorf("Expected 1 document in target, got %d", len(targetDocs))
}
// 验证文档内容被替换
doc := targetDocs[0].Data
if doc["value"] != float64(100) {
t.Errorf("Expected value=100, got %v", doc["value"])
}
if doc["updated"] != true {
t.Errorf("Expected updated=true, got %v", doc["updated"])
}
if _, exists := doc["name"]; exists {
t.Errorf("Expected name field to be removed, but it exists")
}
// 验证统计信息
stats := results[0].Data
if stats["nUpdated"] != float64(1) {
t.Errorf("Expected nUpdated=1, got %v", stats["nUpdated"])
}
}
func TestMerge_MergeFields(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
// 创建源集合和目标集合
CreateTestCollectionForTesting(store, "source", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(100), "newField": "added"}},
})
CreateTestCollectionForTesting(store, "target", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1), "name": "original"}},
})
// 使用 merge 策略合并字段
pipeline := []types.AggregateStage{
{Stage: "$merge", Spec: map[string]interface{}{
"into": "target",
"whenMatched": "merge",
}},
}
results, err := engine.Execute("source", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
// 验证目标集合已合并
targetDocs, err := store.GetAllDocuments("target")
if err != nil {
t.Fatalf("GetAllDocuments() error = %v", err)
}
if len(targetDocs) != 1 {
t.Errorf("Expected 1 document in target, got %d", len(targetDocs))
}
// 验证字段合并:新值覆盖旧值,旧字段保留
doc := targetDocs[0].Data
if doc["value"] != float64(100) {
t.Errorf("Expected value=100, got %v", doc["value"])
}
if doc["name"] != "original" {
t.Errorf("Expected name='original', got %v", doc["name"])
}
if doc["newField"] != "added" {
t.Errorf("Expected newField='added', got %v", doc["newField"])
}
// 验证统计信息
stats := results[0].Data
if stats["nUpdated"] != float64(1) {
t.Errorf("Expected nUpdated=1, got %v", stats["nUpdated"])
}
}
func TestIndexStats(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
CreateTestCollectionForTesting(store, "test", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
})
pipeline := []types.AggregateStage{
{Stage: "$indexStats", Spec: map[string]interface{}{}},
}
_, err := engine.Execute("test", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
}
func TestCollStats(t *testing.T) {
store := NewMemoryStore(nil)
engine := NewAggregationEngine(store)
CreateTestCollectionForTesting(store, "teststats", map[string]types.Document{
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
"doc2": {ID: "doc2", Data: map[string]interface{}{"value": float64(2)}},
})
pipeline := []types.AggregateStage{
{Stage: "$collStats", Spec: map[string]interface{}{}},
}
results, err := engine.Execute("teststats", pipeline)
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
if len(results) != 1 {
t.Errorf("Expected 1 result, got %d", len(results))
}
// 验证统计信息
stats := results[0].Data
if stats["count"] != float64(2) {
t.Errorf("Expected count=2, got %v", stats["count"])
}
if size, ok := stats["size"].(float64); !ok || size <= 0 {
t.Error("Expected positive size")
}
}