分布式锁

ReentrantLock使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 根据业务将锁缓存在容器中
private final Map<String, ReentrantLock> keyLockMap = new ConcurrentHashMap<>();

// 方法调用
public <T> T get(Object key) {
Object value = lookup(key);
if (value != null) {
return (T) value;
}

ReentrantLock lock = keyLockMap.computeIfAbsent(key.toString(), s -> {
log.trace("create lock for key : {}", s);
return new ReentrantLock();
});

try {
lock.lock();
value = lookup(key);
if (value != null) {
return (T) value;
}
}
catch (Exception e) {
throw new ValueRetrievalException(key, e.getCause());
}
finally {
lock.unlock();
}
}

Zookeeper分布式锁

1
2
3
4
5
6
7
8
9
10
public interface Lock {
/**
* Acquire lock
*/
void getLock() throws Exception;
/**
* Release lock
*/
void unlock() throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public abstract class AbstractTemplateLock implements Lock {
@Override
public void getLock() {
if (tryLock()) {
log.info(Thread.currentThread().getName() + "Lock acquired successfully");
} else {
//wait for
waitLock();//Event listening if the node is deleted, it can be retrieved
//Reacquire
getLock();
}
}
protected abstract void waitLock();
protected abstract boolean tryLock();
protected abstract void releaseLock();
@Override
public void unlock() {
releaseLock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.example.lock;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.concurrent.CountDownLatch;

/**
* @author cuishiying
* @date 2021-01-22
*/
@Slf4j
public class ZkTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;

private static final String lockPath = "/lockPath";


private ZkClient client;

public ZkTemplateLock() {
client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
log.info("zk client 连接成功:{}",zkServers);
}



@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);

IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//完成 watcher 注册
client.subscribeDataChanges(lockPath, listener);

//阻塞自己
if (client.exists(lockPath)) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(lockPath, listener);
}

@Override
protected boolean tryLock() {
try {
client.createEphemeral(lockPath);
System.out.println(Thread.currentThread().getName()+"获取到锁");
} catch (Exception e) {
log.error("创建失败");
return false;
}
return true;
}


@Override
public void releaseLock() {
client.delete(this.lockPath);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.example.lock;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* @author cuishiying
* @date 2021-01-22
*/
@Slf4j
public class ZkSequenTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private String beforePath;
private String currentPath;
private ZkClient client;

public ZkSequenTemplateLock() {
client = new ZkClient(zkServers);
if (!client.exists(lockPath)) {
client.createPersistent(lockPath);

}
log.info("zk client Connection successful:{}",zkServers);

}

@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Listening to the node being deleted");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//Adding a data deletion watcher to the top node essentially starts another thread to listen to the previous node
client.subscribeDataChanges(beforePath, listener);
//Block yourself
if (client.exists(beforePath)) {
try {
System.out.println("block"+currentPath);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//Cancel watcher registration
client.unsubscribeDataChanges(beforePath, listener);
}
@Override
protected boolean tryLock() {
if (currentPath == null) {
//Create a temporary sequence node
currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
System.out.println("current:" + currentPath);
}
//Get all children and sort them. Temporary node name is a self growing string
List<String> childrens = client.getChildren(lockPath);
//Sort list, sort in natural order
Collections.sort(childrens);
if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
return true;
} else {
//If the current node is not ranked first, the previous node information is obtained and assigned to beforePath
int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + childrens.get(curIndex - 1);
}
System.out.println("beforePath"+beforePath);
return false;
}
@Override
public void releaseLock() {
System.out.println("delete:" + currentPath);
client.delete(currentPath);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.example.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

/**
* @author cuishiying
* @date 2021-01-22
*/
public class ZkLockWithCuratorTemplate implements Lock {
// zk host address
private String host = "localhost";
// zk self increasing storage node
private String lockPath = "/curatorLock";
// Retry sleep time
private static final int SLEEP_TIME_MS = 1000;
// Maximum retries 1000
private static final int MAX_RETRIES = 1000;
//Session timeout
private static final int SESSION_TIMEOUT = 30 * 1000;
//Connection timeout
private static final int CONNECTION_TIMEOUT = 3 * 1000;
//Cursor core operation class
private CuratorFramework curatorFramework;

InterProcessMutex lock;

public ZkLockWithCuratorTemplate() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
.build();
curatorFramework.start();
lock = new InterProcessMutex (curatorFramework, lockPath);
}
@Override
public void getLock() throws Exception {
//Release lock after 5s timeout
lock.acquire(5, TimeUnit.SECONDS);
}
@Override
public void unlock() throws Exception {
lock.release();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CuratorConfig {
// zk host地址
private String host;

// zk自增存储node
private String lockPath;

// 重试休眠时间
private static final int SLEEP_TIME_MS = 1000;
// 最大重试1000次
private static final int MAX_RETRIES = 1000;
//会话超时时间
private static final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;

private CuratorFramework curatorFramework;

@Bean
public CuratorFramework curatorFramework() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
.build();
curatorFramework.start();
return curatorFramework;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LockTest {

public static void main(String[] args) {
AtomicInteger orderNumber = new AtomicInteger();

Lock lock = new ZkTemplateLock();
CountDownLatch latch = new CountDownLatch(30);
for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
latch.countDown();
latch.await();
try {
lock.getLock();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info(Thread.currentThread().getName() + "----->" + orderNumber.getAndIncrement());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

}

zookeeper简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@Configuration
public class ZkLockConfig {

@Value("${zookeeper.address}")
private String connectString;

@Value("${zookeeper.timeout}")
private int timeout;

@Bean("zkClient")
public ZooKeeper zkClient() {
ZooKeeper zookeeper = null;
try {
final CountDownLatch connectedSemaphore = new CountDownLatch(1);
zookeeper = new ZooKeeper(connectString, timeout, watchedEvent -> {
log.info("Receive watched event: " + watchedEvent.getState());
if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
connectedSemaphore.countDown();
}
});
// 给一个状态CONNECTING,连接中
log.info("zk state: {}", zookeeper.getState());

connectedSemaphore.await();

log.info("zk连接成功......");
} catch (Exception e) {
log.error("zk连接失败, zookeeper.address=" + connectString, e);
}
return zookeeper;
}

@Bean("zkLock")
@ConditionalOnBean(ZooKeeper.class)
public ZkLock zkLock(ZooKeeper zooKeeper) {
return new ZkLock(zooKeeper);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
public class ZkLock {

public static final String LOCK_LOG = "获取分布式锁, lock=[{}]";
private final ZooKeeper zookeeper;

public ZkLock(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}


private void acquireDistributedLock(Long companyId, Long groupId) {
String path = getLockPath(companyId, groupId);
acquireDistributedLock(path);
}

public void acquireDistributedLock(String path) {
try {
zookeeper.create(path, "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info(LOCK_LOG, path);
} catch (Exception e) {
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
log.info("尝试获取[{}]次锁, path=[{}]", count, path);
continue;
}
log.info("重试[{}]次后获取到锁, path=[{}]", count, path);
break;
}
}
}

public boolean acquireFastFailedDistributedLock(Long companyId, Long groupId) {
String path = getLockPath(companyId, groupId);
try {
zookeeper.create(path, "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info(LOCK_LOG, path);
return true;
} catch (Exception e) {
log.info("获取分布式锁失败, lock=[{}]", path);
}
return false;
}

private void releaseDistributedLock(Long companyId, Long groupId) {
String path = getLockPath(companyId, groupId);
releaseDistributedLock(path);
}

public void releaseDistributedLock(String path) {
try {
zookeeper.delete(path, -1);
log.info("释放分布式锁, path=[{}]", path);
} catch (Exception e) {
e.printStackTrace();
}
}

private String getLockPath(Long companyId, Long groupId) {
return String.format(ZK_EXCLUSIVE_LOCK_INNER, companyId, groupId);
}
}

参考