生产者消费者模式

接口定义

1
2
3
4
5
6
7
8
package cn.idea360.migrate;

/**
* @author cuishiying
*/
public interface DataReader extends Runnable{

}
1
2
3
4
5
6
7
8
9
10
11
package cn.idea360.migrate;

/**
* @author cuishiying
*/
public interface DataWriter extends Runnable {

void write(Object row);

void flush();
}
1
2
3
4
5
6
7
8
9
package cn.idea360.migrate;

/**
* @author cuishiying
*/
public interface DataTransformer<S, T> {

<T> T transform(S s);
}

缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package cn.idea360.migrate;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author cuishiying
*/
public class Channel {

/**
* 缓冲容量
*/
private final int capacity;

/**
* 批次大小
*/
private final int batchSize;

/**
* 容器
*/
private final ArrayBlockingQueue<Object> queue;

/**
* 锁
*/
private final ReentrantLock lock;

/**
* 缓冲可读
*/
private final Condition notEmpty;

/**
* 缓冲可写
*/
private final Condition notFull;

public Channel() {
this.capacity = 100;
this.batchSize = 10;
this.queue = new ArrayBlockingQueue<>(capacity);
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}

public void push(Object record) {
try {
this.queue.put(record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
}

public void pushAll(Collection<Object> records) {
lock.lock();
try {
while (records.size() > this.queue.remainingCapacity()) {
notFull.await(200L, TimeUnit.MILLISECONDS);
}
this.queue.addAll(records);
this.notEmpty.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}

public Object pull() {
try {
return this.queue.take();
} catch (Exception e) {
Thread.currentThread().interrupt();
throw new RuntimeException("pull err", e);
}
}

public void pullAll(Collection<Object> rs) {
rs.clear();
lock.lock();
try {
while (this.queue.drainTo(rs, batchSize) <= 0) {
this.notEmpty.await(200, TimeUnit.MILLISECONDS);
}
this.notFull.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package cn.idea360.migrate;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

/**
* @author cuishiying
*/
public class SimpleDataReader implements DataReader {

private final Channel channel;
private final List<Object> mockData = new ArrayList<>();

public SimpleDataReader(Channel channel) {
this.channel = channel;
for (int i = 0; i < 100; i++) {
mockData.add(i);
}
}

@Override
public void run() {
Iterator<Object> iterator = mockData.iterator();
while (iterator.hasNext()) {
long time = (new Random().nextInt(5)) * 10L;
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object row = iterator.next();
channel.push(row);
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.idea360.migrate;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* @author cuishiying
*/
public class SimpleDataWriter implements DataWriter {

private final Channel channel;

private final List<Object> cache = new ArrayList<>();

/**
* 批处理触发间隔为200ms
*/
private final long FLUSH_INTERVAL_MS = 200;

/**
* 触发批处理的最大数据大小为10条
*/
private final long MAX_BATCH_SIZE = 10;

public SimpleDataWriter(Channel channel) {
this.channel = channel;
}

@Override
public void run() {
long preFT=System.currentTimeMillis();
while (true) {
Object row = channel.pull();
if (Objects.nonNull(row)) {
this.write(row);
}
if (cache.size() >= MAX_BATCH_SIZE || System.currentTimeMillis()-preFT >= FLUSH_INTERVAL_MS) {
this.flush();
preFT=System.currentTimeMillis();
}
}
}

@Override
public void write(Object row) {
cache.add(row);
}

@Override
public void flush() {
System.out.println(Thread.currentThread().getName() + ":" + cache);
cache.clear();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.idea360.migrate;

import java.util.ArrayList;
import java.util.List;

/**
* @author cuishiying
*/
public class SimpleDataWriter1 implements DataWriter {

private final Channel channel;

public SimpleDataWriter1(Channel channel) {
this.channel = channel;
}

@Override
public void run() {
while (true) {
List<Object> rows = new ArrayList<>();
channel.pullAll(rows);
if (!rows.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ":" + rows);
}
}
}

@Override
public void write(Object row) {
}

@Override
public void flush() {

}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.idea360.migrate;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author cuishiying
*/
public class Main {

public static void main(String[] args) {
Channel channel = new Channel();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new SimpleDataReader(channel));
for (int i = 0; i < 3; i++) {
executorService.execute(new SimpleDataWriter1(channel));
}
}
}