334 lines
7.7 KiB
Go
334 lines
7.7 KiB
Go
package engine
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
|
|
"git.kingecg.top/kingecg/gomog/pkg/types"
|
|
)
|
|
|
|
// TestConcurrentAccess_Aggregation 测试聚合引擎并发访问安全性
|
|
func TestConcurrentAccess_Aggregation(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
engine := NewAggregationEngine(store)
|
|
|
|
// 准备测试数据
|
|
CreateTestCollectionForTesting(store, "concurrent_test", generateDocuments(100))
|
|
|
|
pipeline := []types.AggregateStage{
|
|
{Stage: "$match", Spec: map[string]interface{}{"status.active": true}},
|
|
{Stage: "$limit", Spec: float64(10)},
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, 10)
|
|
|
|
// 启动 10 个 goroutine 并发执行聚合
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
_, err := engine.Execute("concurrent_test", pipeline)
|
|
if err != nil {
|
|
errors <- err
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
if len(errors) > 0 {
|
|
t.Errorf("Concurrent execution failed with %d errors", len(errors))
|
|
for err := range errors {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestRaceCondition_MemoryStore 测试 MemoryStore 的竞态条件
|
|
func TestRaceCondition_MemoryStore(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
|
|
// 创建集合
|
|
CreateTestCollectionForTesting(store, "race_test", map[string]types.Document{
|
|
"doc1": {ID: "doc1", Data: map[string]interface{}{"value": float64(1)}},
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, 20)
|
|
|
|
// 并发读取和写入
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(2)
|
|
|
|
// 读操作
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
_, err := store.GetAllDocuments("race_test")
|
|
if err != nil {
|
|
errors <- err
|
|
}
|
|
}(i)
|
|
|
|
// 写操作
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
doc := types.Document{
|
|
ID: fmt.Sprintf("newdoc%d", id),
|
|
Data: map[string]interface{}{"value": float64(id)},
|
|
}
|
|
err := store.InsertDocument("race_test", doc)
|
|
if err != nil {
|
|
errors <- err
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
if len(errors) > 0 {
|
|
t.Errorf("Race condition detected with %d errors", len(errors))
|
|
}
|
|
}
|
|
|
|
// TestConcurrent_UnionWith 测试 $unionWith 的并发安全性
|
|
func TestConcurrent_UnionWith(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
engine := NewAggregationEngine(store)
|
|
|
|
// 创建多个集合
|
|
CreateTestCollectionForTesting(store, "union_main", generateDocuments(50))
|
|
CreateTestCollectionForTesting(store, "union_other1", generateDocuments(50))
|
|
CreateTestCollectionForTesting(store, "union_other2", generateDocuments(50))
|
|
|
|
pipeline := []types.AggregateStage{
|
|
{Stage: "$unionWith", Spec: "union_other1"},
|
|
{Stage: "$unionWith", Spec: "union_other2"},
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 5; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, err := engine.Execute("union_main", pipeline)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// TestConcurrent_Redact 测试 $redact 的并发安全性
|
|
func TestConcurrent_Redact(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
engine := NewAggregationEngine(store)
|
|
|
|
docs := make(map[string]types.Document)
|
|
for i := 0; i < 100; i++ {
|
|
docs[fmt.Sprintf("doc%d", i)] = types.Document{
|
|
ID: fmt.Sprintf("doc%d", i),
|
|
Data: map[string]interface{}{
|
|
"level": float64(i % 10),
|
|
"secret": "classified",
|
|
"public": "visible",
|
|
},
|
|
}
|
|
}
|
|
CreateTestCollectionForTesting(store, "redact_test", docs)
|
|
|
|
spec := map[string]interface{}{
|
|
"$cond": map[string]interface{}{
|
|
"if": map[string]interface{}{
|
|
"$gte": []interface{}{"$level", float64(5)},
|
|
},
|
|
"then": "$$KEEP",
|
|
"else": "$$PRUNE",
|
|
},
|
|
}
|
|
|
|
pipeline := []types.AggregateStage{
|
|
{Stage: "$redact", Spec: spec},
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, err := engine.Execute("redact_test", pipeline)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// TestConcurrent_OutMerge 测试 $out/$merge 的并发写入
|
|
func TestConcurrent_OutMerge(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
engine := NewAggregationEngine(store)
|
|
|
|
// 源集合
|
|
CreateTestCollectionForTesting(store, "source_concurrent", generateDocuments(20))
|
|
|
|
var wg sync.WaitGroup
|
|
targetCollections := []string{"target1", "target2", "target3"}
|
|
|
|
// 并发执行 $out 到不同集合
|
|
for i, target := range targetCollections {
|
|
wg.Add(1)
|
|
go func(idx int, coll string) {
|
|
defer wg.Done()
|
|
pipeline := []types.AggregateStage{
|
|
{Stage: "$out", Spec: coll},
|
|
}
|
|
_, err := engine.Execute("source_concurrent", pipeline)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}(i, target)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// 验证所有目标集合都已创建
|
|
for _, coll := range targetCollections {
|
|
docs, err := store.GetAllDocuments(coll)
|
|
if err != nil {
|
|
t.Errorf("Target collection %s not found", coll)
|
|
}
|
|
if len(docs) != 20 {
|
|
t.Errorf("Expected 20 docs in %s, got %d", coll, len(docs))
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestStress_LargeDataset 压力测试:大数据集
|
|
func TestStress_LargeDataset(t *testing.T) {
|
|
store := NewMemoryStore(nil)
|
|
engine := NewAggregationEngine(store)
|
|
|
|
// 生成 10000 个文档
|
|
largeDocs := make(map[string]types.Document)
|
|
for i := 0; i < 10000; i++ {
|
|
largeDocs[fmt.Sprintf("doc%d", i)] = types.Document{
|
|
ID: fmt.Sprintf("doc%d", i),
|
|
Data: map[string]interface{}{
|
|
"index": float64(i),
|
|
"category": fmt.Sprintf("cat%d", i%100),
|
|
"value": float64(i) * 1.5,
|
|
"tags": []interface{}{"tag1", "tag2", "tag3"},
|
|
"metadata": map[string]interface{}{"created": "2024-01-01"},
|
|
},
|
|
}
|
|
}
|
|
CreateTestCollectionForTesting(store, "stress_large", largeDocs)
|
|
|
|
pipeline := []types.AggregateStage{
|
|
{Stage: "$match", Spec: map[string]interface{}{
|
|
"index": map[string]interface{}{"$lt": float64(5000)},
|
|
}},
|
|
{Stage: "$group", Spec: map[string]interface{}{
|
|
"_id": "$category",
|
|
"total": map[string]interface{}{"$sum": "$value"},
|
|
}},
|
|
{Stage: "$sort", Spec: map[string]interface{}{"total": -1}},
|
|
{Stage: "$limit", Spec: float64(10)},
|
|
}
|
|
|
|
// 执行 5 次,验证稳定性
|
|
for i := 0; i < 5; i++ {
|
|
results, err := engine.Execute("stress_large", pipeline)
|
|
if err != nil {
|
|
t.Fatalf("Iteration %d failed: %v", i, err)
|
|
}
|
|
if len(results) > 100 { // 应该有最多 100 个类别
|
|
t.Errorf("Unexpected result count: %d", len(results))
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConcurrent_TypeConversion 测试类型转换的并发安全性
|
|
func TestConcurrent_TypeConversion(t *testing.T) {
|
|
engine := &AggregationEngine{}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 20; i++ {
|
|
wg.Add(4)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
data := map[string]interface{}{"value": float64(id)}
|
|
_ = engine.toString("$value", data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
data := map[string]interface{}{"value": float64(id)}
|
|
_ = engine.toInt("$value", data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
data := map[string]interface{}{"value": float64(id)}
|
|
_ = engine.toDouble("$value", data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
data := map[string]interface{}{"value": float64(id)}
|
|
_ = engine.toBool("$value", data)
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// TestConcurrent_Bitwise 测试位运算的并发安全性
|
|
func TestConcurrent_Bitwise(t *testing.T) {
|
|
engine := &AggregationEngine{}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(4)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
operand := []interface{}{float64(id), float64(id * 2)}
|
|
data := map[string]interface{}{}
|
|
_ = engine.bitAnd(operand, data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
operand := []interface{}{float64(id), float64(id * 2)}
|
|
data := map[string]interface{}{}
|
|
_ = engine.bitOr(operand, data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
operand := []interface{}{float64(id), float64(id * 2)}
|
|
data := map[string]interface{}{}
|
|
_ = engine.bitXor(operand, data)
|
|
}(i)
|
|
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
operand := float64(id)
|
|
data := map[string]interface{}{}
|
|
_ = engine.bitNot(operand, data)
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|