百万数据秒级入库

在项目中经常遇到大数据入库的需求,详细剖析该问题(指定数组大小拆分List,多线程入库,JDBC批量入库)

前言

  • 本文章的来自于作者工作中的真实项目,如果看到文章的你有更好的建议,烦请在文章下留言或私信我。

将List拆分为指定大小的List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.ArrayList;
import java.util.List;

/**
* @ClassName ArrayUtil
* @Description 数组操作工具类
* @Author Dew
* @Date 2019/4/1 13:53
* @Version 1.0
**/
public class ArrayUtil<T> {

/**
* @Author Dew
* @Description 将List拆分为 groupSize 大小的二维数组
* @Param [values 待分组Array 分组, groupSize 分组大小]
* @Date 13:54 2019/4/1
* @Return java.util.List<T>
**/
public List<List<T>> spilitGroup(List<T> values, int groupSize) {
List<List<T>> listGroup = new ArrayList<>();
// List 长度
int listSize = values.size();
int runSize = (listSize / groupSize) + 1;

List<T> value = null;
for (int i = 0; i < runSize; i++) {
int start = i * groupSize;
if (i + 1 == runSize) {
int end = listSize;
value = values.subList(start, end);
} else {
int end = (i + 1) * groupSize;
value = values.subList(start, end);
}
listGroup.add(value);
}
return listGroup;
}
}

创建数据入库线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @ClassName: BatchSaveThread
* @Description: 多线程执行入库操作
* @date 2019/03/14
*/
class BatchSaveThread implements Runnable {

private List<AdUserGridDo> list;

public BatchSaveThread(List<AdUserGridDo> list) {
this.list = list;
}

@Override
public void run() {
if (list.size() > 0) {
advertisingUserDao.batchSaveJDBC(list);
}
}

}

List执行批量入库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void batchSave(List<AdUserGridDo> models, int groupSize) {
List<List<AdUserGridDo>> groupList = new ArrayUtil<AdUserGridDo>().spilitGroup(models, groupSize);

// 创建线程数 = 数据总数 / groupList
int threadPoolSize = groupList.size();
// 最多创建 4 个线程
threadPoolSize = threadPoolSize >= 4 ? 4 : threadPoolSize;
log.info("##不为空: {} 地址列表,入库创建的线程数: {}", threadPoolSize);
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
try {
// 拆分网格数据
for (int i = 0, length = groupList.size(); i < length; i++) {
logger.error("线程:" + i + "save API");
List<AdUserGridDo> list = groupList.get(i);
BatchSaveThread saveThread = new BatchSaveThread(list);
executor.execute(saveThread);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
} finally {
executor.shutdown();
}
}

Spring 项目批量入库

  • Repository 内调用
1
2
3
4
public void batchSaveJDBC(List<AdvertisingEntity> list) {
BatchSave save = new BatchSave();
save.batchSaveAdvertising(list);
}
  • 创建内部类,获取数据连接并执行入库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
static Properties properties = new Properties();
static {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream in = new BufferedInputStream(cl.getResourceAsStream("jdbc.properties"));
try {
properties.load(in);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* @ClassName: BatchSave
* @Description: JDBC 批量入库,解决效率问题
* @date 2019/03/14
*/
class BatchSave {

/**
* 数据库连接驱动
*/
private String JDBC_DRIVER = properties.getProperty("driverClass");

/**
* 连接数据库url
*/
private String JDBC_URL = properties.getProperty("jdbcUrl");
/**
* 连接数据库用户
*/
private String JDBC_USER = properties.getProperty("user");

/**
* 连接数据库密码
*/
private String JDBC_PASSWORD = properties.getProperty("password");

private void batchSaveAdvertising(List<AdvertisingEntity> list) {
Connection connection = null;
PreparedStatement statement = null;

try {
Class.forName(JDBC_DRIVER);
connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
String sql = "insert into t_advertising values(?,?,?,?,?)";
statement = connection.prepareStatement(sql);

connection.setAutoCommit(false);

AdvertisingEntity value = null;
for (int i = 0; i < list.size(); i++) {
value = list.get(i);
statement.setString(1, value.getName());
statement.setString(2, value.getLongitude());
statement.setString(3, value.getLatitude());
statement.setInt(4, value.getGridX());
statement.setInt(5, value.getGridY());

statement.addBatch();
if (i % 100 == 0) {
statement.executeBatch();
statement.clearParameters();
}
}
statement.executeBatch();
connection.commit();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
} finally {
try {
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e2) {
throw new RuntimeException();
}
}
}
}

SpringBoot 批量入库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Repository
public class InfoRepository {

private final Logger logger = LoggerFactory.getLogger(InfoRepository.class);

@Autowired
private NamedParameterJdbcTemplate secondaryJdbcTemplate;

/**
* @Author Dew
* @Description 用户网格数据批量入库
* @Param [list 入库的数据]
* @Date 14:45 2019/4/1
* @Return void
**/
public void batchSaveAdvertising(List<AdUserGridDo> list) {
long start = System.currentTimeMillis();
String sql = "insert into info values(:name,:longitude,:latitude)";
SqlParameterSource[] beanSource = SqlParameterSourceUtils.createBatch(list.toArray());
secondaryJdbcTemplate.batchUpdate(sql, beanSource);
long end = System.currentTimeMillis();

logger.info("数据入库:\t" + (end - start) + "ms");
}

}

注:

Point 1. 使用:属性名传参
Point 2. 将百万数据拆分成了2w一组的数据,分批入库减小了数据库压力
Point 3. 入库创建的线程数需要根据服务器的性能进行调整,因为线程数量超过服务器负载能力时会造成不可描述的后果。