MongoTemplate使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package cn.idea360.assistant.dev.mongo;

import com.mongodb.ConnectionString;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.util.CloseableIterator;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author cuishiying
* @date 2024-11-06
*/
public class MongoTest {

private MongoTemplate mongoTemplate;

@Before
public void newMongoTemplate() {
ConnectionString connectionString = new ConnectionString("mongodb://coreuser:uil5sQpS9ODA@10.10.10.206:27017/coredb");
SimpleMongoClientDatabaseFactory mongoClientDatabaseFactory = new SimpleMongoClientDatabaseFactory(connectionString);
mongoTemplate = new MongoTemplate(mongoClientDatabaseFactory);
}

/**
* 使用 MongoTemplate 分页查询.
*
* <p>
* 在 MongoDB 中使用 skip() 处理大量数据时会有性能问题。因为 MongoDB 需要先扫描跳过的文档,再返回所需的文档,随着 skip 值的增加,查询会变得越来越慢。
* </p>
*/
@Test
public void pageQuery() {
int pageSize = 100;
long skip = 0;
String collectionName = "monthly_bills";

Criteria criteria = new Criteria();
criteria.and("billingMonth").is(202410);
Query query = Query.query(criteria);

query.with(Sort.by(Sort.Direction.DESC, "totalCalls"));
query.skip(skip).limit(pageSize);

List<Document> rows = mongoTemplate.find(query, Document.class, collectionName);
System.out.println("数据量: " + rows.size());
}

/**
* 使用游标查询.
*
* <p>内存占用低,适合大数据量查询</p>
*/
@Test
public void queryByCursor() {
int batchSize = 100;
String collectionName = "monthly_bills";

Criteria criteria = new Criteria();
criteria.and("billingMonth").is(202410);
Query query = Query.query(criteria);
query.with(Sort.by(Sort.Direction.DESC, "totalCalls"));

long totalCompanies = mongoTemplate.count(query, Document.class, collectionName);
System.out.println("totalCompanies: " + totalCompanies);

AtomicInteger processedCount = new AtomicInteger(0);
try (CloseableIterator<Document> iterator = mongoTemplate.stream(
query, Document.class, collectionName)) {
while (iterator.hasNext()) {
Document document = iterator.next();
int current = processedCount.incrementAndGet();
if (current % batchSize == 0) {
double progress = (double) current / totalCompanies * 100;
System.out.printf("统计进度: %s/%s (%s)%n",
current, totalCompanies, String.format("%.2f", progress));
}
}
}

System.out.println("数据量: " + processedCount.get());
}

/**
* 使用聚合查询.
*
* <p>支持复杂的聚合操作, 可以进行分组、计算、转换等操作, 性能取决于聚合管道的复杂度</p>
*/
@Test
public void queryByAggregation() {
int batchSize = 100;
String collectionName = "monthly_bills";

Criteria criteria = new Criteria();
criteria.and("billingMonth").is(202410);

long totalCompanies = mongoTemplate.aggregate(
Aggregation.newAggregation(
Aggregation.match(criteria),
Aggregation.group("cid")
),
collectionName,
Document.class
).getMappedResults().size();
System.out.println("totalCompanies: " + totalCompanies);

Aggregation aggregation = Aggregation.newAggregation(
// 可选:匹配条件(如时间范围)
Aggregation.match(criteria),
// 按公司ID分组并计数
Aggregation.group("cid")
.count().as("count")
.sum("amount").as("totalAmount")
.avg("amount").as("avgAmount")
.min("createTime").as("firstTime")
.max("createTime").as("lastTime"),

// 可选:按数量排序
Aggregation.sort(Sort.Direction.DESC, "count")
);

AtomicInteger processedCount = new AtomicInteger(0);
try (CloseableIterator<Document> iterator = mongoTemplate.aggregateStream(
aggregation, collectionName, Document.class)) {

while (iterator.hasNext()) {
Document document = iterator.next();
int current = processedCount.incrementAndGet();
if (current % batchSize == 0) {
double progress = (double) current / totalCompanies * 100;
System.out.printf("统计进度: %s/%s (%s)%n",
current, totalCompanies, String.format("%.2f", progress));
}
}

}

System.out.println("数据量: " + processedCount.get());
}

/**
* 多条件复杂查询
*/
@Test
public void complexQuery() {
String collectionName = "monthly_bills";

// 复杂条件
List<Criteria> criteriaList = new ArrayList<>();
criteriaList.add(Criteria.where("billingMonth").is(202410));
Criteria buildCriteria = new Criteria().andOperator(criteriaList.toArray(new Criteria[0]));

List<Sort.Order> orders = new ArrayList<>();
// 始终添加_id排序保证分页稳定性
orders.add(Sort.Order.asc("_id"));
Sort buildSort = Sort.by(orders);

Query query = Query.query(buildCriteria).with(buildSort);
long count = mongoTemplate.count(query, Document.class, collectionName);
System.out.println("数据量: " + count);
}
}