模仿Eureka实现多级延时缓存

前言

该篇为 Eureka 轮子篇

实现

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.idea360</groupId>
<artifactId>multiple-delay-cache</artifactId>
<version>0.0.1</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.5.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package cn.idea360.multiple.cache;

import lombok.Data;

/**
* @author cuishiying
*/
@Data
public class CacheConfig {

private int initialCapacityOfCache = 1000;
private long cacheAutoExpirationInSeconds = 180;
private long cacheUpdateIntervalMs = 30 * 1000L;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
package cn.idea360.multiple.cache;

/**
* @author cuishiying
*/
public interface MultipleCache {

String get(String key);

void invalidate(String... keys);

void setDataRepository(DataRepository dataRepository);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package cn.idea360.multiple.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.*;

/**
* @author cuishiying
*/
@Slf4j
public class MultipleCacheImpl implements MultipleCache {

private final ConcurrentMap<String, String> readOnlyCacheMap = new ConcurrentHashMap<>();

private final LoadingCache<String, String> readWriteCacheMap;

private CacheConfig cacheConfig;

private DataRepository dataRepository;

public MultipleCacheImpl() {
this(1000, 180, 1000L * 30);
}

public MultipleCacheImpl(CacheConfig cacheConfig) {
this(cacheConfig.getInitialCapacityOfCache(), cacheConfig.getCacheAutoExpirationInSeconds(), cacheConfig.getCacheUpdateIntervalMs());
this.cacheConfig = cacheConfig;
}

private MultipleCacheImpl(int initialCapacityOfCache, long cacheAutoExpirationInSeconds, long cacheUpdateIntervalMs) {
// 二级缓存默认80s过期
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(initialCapacityOfCache)
.expireAfterWrite(cacheAutoExpirationInSeconds, TimeUnit.SECONDS)
.removalListener((RemovalListener<String, String>) notification -> {
String key = notification.getKey();
String value = notification.getValue();
log.debug("cache for key [{}] has been remove, value [{}]", key, value);
})
.build(new CacheLoader<>() {

@Override
public String load(String key) throws Exception {
String value = generatePayload(key);
log.debug("load data from db for key [{}], value [{}]", key, value);
return value;
}
});

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("multiple-cachefill-schedule-pool-%d").daemon(true).build());

// 一级缓存每30s从二级缓存同步数据
executorService.scheduleAtFixedRate(() -> {
for (String key : readOnlyCacheMap.keySet()) {
try {
String cacheValue = readWriteCacheMap.get(key);
String currentCacheValue = readOnlyCacheMap.get(key);
if (!cacheValue.equals(currentCacheValue)) {
log.debug("readOnlyCache sync cache from readWriteCache for key [{}]", key);
readOnlyCacheMap.put(key, cacheValue);
} else {
log.debug("cacheValue: [{}], currentCacheValue: [{}]", cacheValue, currentCacheValue);
}
} catch (Throwable th) {
log.error("Error while updating the readOnlyCache from readWriteCache for key {}", key, th);
}
}
},2, cacheUpdateIntervalMs, TimeUnit.MILLISECONDS);
}

private String generatePayload(String key) {
return dataRepository.load(key);
}

@Override
public String get(String key) {
return getValue(key, true);
}

private String getValue(final String key, boolean useReadOnlyCache) {
String payload = null;
try {
if (useReadOnlyCache) {
final String currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
log.debug("load data from readOnlyCache for key [{}], value [{}]", key, payload);
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
log.debug("load data from readWriteCache for key [{}], value [{}]", key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
log.error("Cannot get value for key : {}", key, t);
}
return payload;
}

@Override
public void invalidate(String... keys) {
for (String key : keys) {
readWriteCacheMap.invalidate(key);
}
}

@Override
public void setDataRepository(DataRepository dataRepository) {
this.dataRepository = dataRepository;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package cn.idea360.multiple.cache;

/**
* @author cuishiying
*/
public interface DataRepository {

default String load(String key) {
return "";
}

String loadWithCache(String key);

void write(String key, String value);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.idea360.multiple.cache;

/**
* @author cuishiying
*/
public abstract class AbstractDataRepository implements DataRepository {

protected MultipleCache cache = new MultipleCacheImpl();

protected AbstractDataRepository() {
cache.setDataRepository(this);
}

@Override
public String loadWithCache(String key) {
return cache.get(key);
}

@Override
public void write(String key, String value) {
boolean result = doWrite(key, value);
if (result) {
cache.invalidate(key);
}
}

public void setCache(MultipleCache cache) {
cache.setDataRepository(this);
this.cache = cache;
}

protected abstract boolean doWrite(String key, String value);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.idea360.multiple.cache;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

/**
* @author cuishiying
*/
@Slf4j
public class InMemoryDataRepository extends AbstractDataRepository{

private final Map<String, String> data = new HashMap<>();

@Override
public String load(String key) {
return data.getOrDefault(key, "");
}

@Override
protected boolean doWrite(String key, String value) {
try {
data.put(key, value);
return true;
} catch (Exception e) {
log.error("doWrite err", e);
return false;
}
}
}

simplelogger.properties

1
2
3
4
org.slf4j.simpleLogger.defaultLogLevel=info
org.slf4j.simpleLogger.log.cn.idea360.multiple.cache=debug
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package cn.idea360.multiple.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

/**
* https://www.baeldung.com/guava-cache
* Eureka源码
* @author cuishiying
*/
//@RunWith(MockitoJUnitRunner.class)
public class GuavaCacheTest {

@Test
public void whenCacheMiss_thenValueIsComputed() {
CacheLoader<String, String> loader;
loader = new CacheLoader<>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder().build(loader);

assertEquals(0, cache.size());
assertEquals("HELLO", cache.getUnchecked("hello"));
assertEquals(1, cache.size());
}

@Test
public void whenCacheReachMaxSize_thenEviction() {
CacheLoader<String, String> loader;
loader = new CacheLoader<>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};
LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder().maximumSize(3).build(loader);

cache.getUnchecked("first");
cache.getUnchecked("second");
cache.getUnchecked("third");
cache.getUnchecked("forth");
assertEquals(3, cache.size());
assertNull(cache.getIfPresent("first"));
assertEquals("FORTH", cache.getIfPresent("forth"));
}

@Test
public void whenCacheReachMaxWeight_thenEviction() {
CacheLoader<String, String> loader;
loader = new CacheLoader<>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

Weigher<String, String> weighByLength;
weighByLength = new Weigher<String, String>() {
@Override
public int weigh(String key, String value) {
return value.length();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
.maximumWeight(16)
.weigher(weighByLength)
.build(loader);

cache.getUnchecked("first");
cache.getUnchecked("second");
cache.getUnchecked("third");
cache.getUnchecked("last");
assertEquals(3, cache.size());
assertNull(cache.getIfPresent("first"));
assertEquals("LAST", cache.getIfPresent("last"));
}

@Test
public void whenEntryIdle_thenEviction()
throws InterruptedException {
CacheLoader<String, String> loader;
loader = new CacheLoader<>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
.expireAfterAccess(2, TimeUnit.MILLISECONDS)
.build(loader);

cache.getUnchecked("hello");
assertEquals(1, cache.size());

cache.getUnchecked("hello");
Thread.sleep(300);

cache.getUnchecked("test");
assertEquals(1, cache.size());
assertNull(cache.getIfPresent("hello"));
}

@Test
public void whenEntryLiveTimeExpire_thenEviction()
throws InterruptedException {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder()
.expireAfterWrite(2,TimeUnit.MILLISECONDS)
.build(loader);

cache.getUnchecked("hello");
assertEquals(1, cache.size());
Thread.sleep(300);
cache.getUnchecked("test");
assertEquals(1, cache.size());
assertNull(cache.getIfPresent("hello"));
}

@Test
public void whenWeakKeyHasNoRef_thenRemoveFromCache() {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder().weakKeys().build(loader);
}

@Test
public void whenSoftValue_thenRemoveFromCache() {
CacheLoader<String, String> loader;
loader = new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key.toUpperCase();
}
};

LoadingCache<String, String> cache;
cache = CacheBuilder.newBuilder().softValues().build(loader);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.idea360.multiple.cache;

import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

@Slf4j
public class MultipleCachelTest {

private String key;

@Before
public void init() throws Exception {
key = "hello";
}

@Test
public void get() throws InterruptedException {
// 1. 从db获取, 30s后一级缓存会同步二级缓存, 180s后二级缓存失效
MultipleCache cache = new MultipleCacheImpl();
log.info("first fetch data from multiple cache");
assertEquals("HELLO", cache.get(key));
Thread.sleep(5 * 1000);

// 2. 从一级缓存获取
log.info("second fetch data from multiple cache");
assertEquals("HELLO", cache.get(key));

// 3. 手动失效二级缓存, 从一级缓存获取
log.info("third fetch data from multiple cache");
cache.invalidate(key);
assertEquals("HELLO", cache.get(key));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.idea360.multiple.cache;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

@Slf4j
public class InMemoryDataRepositoryTest {

@Test
public void load() throws InterruptedException {
String key = "name";

CacheConfig cacheConfig = new CacheConfig();
cacheConfig.setInitialCapacityOfCache(3);
cacheConfig.setCacheAutoExpirationInSeconds(180);
cacheConfig.setCacheUpdateIntervalMs(1000 * 2);
MultipleCache cache = new MultipleCacheImpl(cacheConfig);
InMemoryDataRepository repository = new InMemoryDataRepository();
repository.setCache(cache);

log.info("first={}", repository.loadWithCache(key));

repository.write(key, "admin");
log.info("second={}", repository.loadWithCache(key));

Thread.sleep(5 * 1000);
repository.write(key, "test");
log.info("third={}", repository.loadWithCache(key));

Thread.sleep(5 * 1000);
log.info("forth={}", repository.loadWithCache(key));
}
}