JAVA中用多线程技术实现大数据导入
- 格式:doc
- 大小:15.21 KB
- 文档页数:20
java多线程实际应用案例Java多线程是一种并发编程的方式,可以使程序同时执行多个任务,提高程序的执行效率和响应速度。
下面列举了十个Java多线程实际应用案例。
1. 电商网站订单处理:在一个电商网站中,订单的处理是一个非常繁琐且耗时的工作,可以使用多线程实现订单的并发处理,提高订单处理的效率。
2. 聊天软件消息发送:在聊天软件中,用户发送消息是一个频繁的操作,可以使用多线程实现消息的并发发送,提高用户体验。
3. 数据库读写操作:在数据库的读写操作中,读操作可以使用多线程并发执行,提高数据的读取速度;写操作可以使用多线程并发执行,提高数据的写入速度。
4. 图像处理:在图像处理中,可以使用多线程实现图像的并行处理,提高图像处理的速度。
5. 视频编解码:在视频编解码中,可以使用多线程实现视频的并行编解码,提高视频的处理速度。
6. 网络爬虫:在网络爬虫中,可以使用多线程实现并发的爬取网页数据,提高爬虫的效率。
7. 游戏开发:在游戏开发中,可以使用多线程实现游戏的并行处理,提高游戏的运行速度和响应速度。
8. 大数据处理:在大数据处理中,可以使用多线程实现并发的数据处理,提高大数据处理的效率。
9. 并发服务器:在服务器开发中,可以使用多线程实现并发的请求处理,提高服务器的并发能力。
10. 并发任务调度:在任务调度中,可以使用多线程实现并发的任务执行,提高任务的执行效率。
在实际应用中,多线程不仅可以提高程序的执行效率和响应速度,还可以充分利用多核处理器的优势,实现并行计算和并发处理。
然而,多线程编程也面临着诸多挑战,如线程安全、死锁、资源竞争等问题,需要设计合理的线程同步和互斥机制,确保程序的正确性和稳定性。
因此,在使用多线程编程时,需要仔细考虑线程间的依赖关系和数据共享问题,合理规划线程的数量和调度策略,确保多线程程序的正确性和性能。
Java语言在大型数据库系统设计与优化中的应用一、引言随着互联网和大数据时代的到来,大型数据库系统的设计和优化变得愈发重要。
在这个背景下,Java作为一种广泛应用的编程语言,其在大型数据库系统设计与优化中的应用也备受关注。
本文将探讨Java语言在大型数据库系统设计与优化中的重要性以及具体应用。
二、Java语言在数据库系统设计中的优势1. 跨平台性Java作为一种跨平台的编程语言,可以在不同操作系统上运行,这为数据库系统的设计和部署提供了更大的灵活性和便利性。
2. 强大的生态系统Java拥有庞大而活跃的开发社区,丰富的第三方库和框架为数据库系统设计带来了更多选择和支持,加快了开发速度。
3. 高性能Java虚拟机(JVM)经过多年的优化和改进,在性能方面已经达到了很高的水准。
通过合理地利用JVM的特性,可以实现高效率、高性能的数据库系统。
三、Java语言在数据库系统优化中的应用1. 内存管理Java提供了自动内存管理机制,通过垃圾回收器(Garbage Collector)对内存进行管理和释放,避免了内存泄漏等问题。
合理地配置内存参数可以提升数据库系统的性能。
2. 多线程支持Java具有良好的多线程支持,可以通过多线程技术实现并发访问数据库,提高系统的吞吐量和响应速度。
3. 数据库连接池通过使用Java中成熟的数据库连接池技术,可以有效地管理数据库连接资源,减少连接创建和销毁的开销,提升系统性能。
4. SQL优化Java程序员可以通过编写高效的SQL语句来优化数据库查询性能,避免全表扫描等低效操作,提高查询速度。
四、案例分析:Java语言在大型数据库系统中的成功应用以某电商平台为例,该平台使用Java语言开发了一个大型电商数据库系统。
通过合理地利用Java技术栈中的各种工具和框架,该系统取得了显著的成绩:数据库读写性能得到显著提升:通过合理配置JVM参数和使用缓存技术,系统读写性能得到了明显改善。
系统稳定性得到保障:利用Java多线程技术实现了并发访问控制,避免了数据竞争等问题。
关于Java多线程处理List数据⼀、背景多线程数量的问题,⼀般情况下,多线程数量要等于机器CPU核数-1。
⼆、实例1、解决问题:如何让n个线程顺序遍历含有n个元素的List集合import java.util.ArrayList;import java.util.List;import ng3.ArrayUtils;public class Test_4 {/*** 多线程处理list** @param data 数据list* @param threadNum 线程数*/public synchronized void handleList(List<String> data, int threadNum) {int length = data.size();int tl = length % threadNum == 0 ? length / threadNum : (length/ threadNum + 1);for (int i = 0; i < threadNum; i++) {int end = (i + 1) * tl;HandleThread thread = new HandleThread("线程[" + (i + 1) + "] ", data, i * tl, end > length ? length : end);thread.start();}}class HandleThread extends Thread {private String threadName;private List<String> data;private int start;private int end;public HandleThread(String threadName, List<String> data, int start, int end) {this.threadName = threadName;this.data = data;this.start = start;this.end = end;}public void run() {List<String> subList = data.subList(start, end)/*.add("^&*")*/;System.out.println(threadName+"处理了"+subList.size()+"条!");}}public static void main(String[] args) {Test_4 test = new Test_4();// 准备数据List<String> data = new ArrayList<String>();for (int i = 0; i < 6666; i++) {data.add("item" + i);}test.handleList(data, 5);System.out.println(ArrayUtils.toString(data));}}2、List多线程并发读取读取现有的list对象//测试读取List的线程类,⼤概34秒package com.thread.list;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class Main {public static void main(String[] args) {List<String> list = new ArrayList<String>();Map<Long,Integer> map = new HashMap<Long,Integer>();for(int i = 0;i<1000;i++){list.add(""+i);}int pcount = Runtime.getRuntime().availableProcessors();long start = System.currentTimeMillis();for(int i=0;i<pcount;i++){Thread t = new MyThread1(list,map);map.put(t.getId(),Integer.valueOf(i));t.start();try {t.join();} catch (InterruptedException e) {e.printStackTrace();}// System.out.println(list.get(i));}System.out.println("----"+(System.currentTimeMillis() - start));}}//线程类package com.thread.list;import java.util.List;import java.util.Map;public class MyThread1 extends Thread {private List<String> list;private Map<Long,Integer> map;public MyThread1(List<String> list,Map<Long,Integer> map){this.list = list;this.map = map;}@Overridepublic void run() {int pcount = Runtime.getRuntime().availableProcessors();int i = map.get(Thread.currentThread().getId());for(;i<list.size();i+=pcount){System.out.println(list.get(i));}}}3、多线程分段处理List集合场景:⼤数据List集合,需要对List集合中的数据同标准库中数据进⾏对⽐,⽣成新增,更新,取消数据。
一、概述随着信息技术的不断发展和应用,数据处理和管理方面的需求也日益增加。
在大数据处理和管理方面,Java作为一种流行且功能强大的编程语言,为开发人员提供了丰富的工具和库。
本文将介绍如何使用Java编程语言来处理和管理大数据,并重点讨论如何写入txt文件的大数据方法。
二、大数据处理的挑战传统的数据处理方法通常无法满足大数据处理的需求,例如内存的限制、并发处理的问题等。
针对大数据处理的挑战,Java提供了多种机制和工具,例如多线程、文件流、缓冲区等。
这些工具和机制可以帮助开发人员更有效地处理大数据。
三、Java写入txt文件的基本方法在Java中,写入txt文件的基本方法通常涉及到以下几个步骤:1. 创建文件对象:首先需要创建一个文件对象来表示要写入的文件。
可以使用Java中的File类来创建文件对象。
2. 创建文件输出流:接下来需要创建一个文件输出流来将数据写入到文件中。
可以使用Java中的FileOutputStream类来创建文件输出流。
3. 写入数据:一旦创建了文件输出流,就可以调用其write()方法来写入数据。
可以使用缓冲输出流来提高写入效率。
四、Java写入大数据到txt文件的方法在处理大数据时,直接使用FileOutputStream的write()方法来写入数据可能会遇到内存溢出的问题。
为了有效地处理大数据,可以考虑以下几种方法:1. 使用缓冲区:可以通过使用缓冲输出流来提高写入大数据的效率。
缓冲输出流可以将数据暂时存储在缓冲区中,然后一次性写入到文件中,避免频繁的文件I/O操作。
2. 使用多线程:可以考虑使用多线程来并发地写入大数据。
通过将数据分割成多个小块,并分配给不同的线程来处理,可以提高写入数据的速度。
3. 使用内存映射文件:Java提供了内存映射文件的机制,可以将文件映射到内存中,然后直接对内存进行操作,从而避免频繁的文件I/O 操作。
五、示例代码以下是一个使用Java写入大数据到txt文件的示例代码:```javaimport java.io.BufferedWriter;import java.io.File;import java.io.FileWriter;import java.io.IOException;public class WriteBigDataToFile {public static void m本人n(String[] args) {File file = new File("bigdata.txt");try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))){for (int i = 0; i < xxx; i++) {writer.write(String.valueOf(i));writer.newLine();}} catch (IOException e) {e.printStackTrace();}}}```六、总结在处理大数据时,选择合适的方法和工具非常重要。
多线程读取数据写入文件的方法一、概述多线程读取数据写入文件是一种高效的数据处理方式,可以在并发环境下同时进行数据读取和写入操作,提高数据处理的效率。
本文将介绍一种基于多线程的数据处理方法,包括其工作原理、实现方式、注意事项等。
二、工作原理多线程读取数据写入文件的基本原理是将数据读取和写入操作分解为多个子任务,然后将这些子任务分配给多个线程同时执行。
每个线程负责一部分数据的读取和写入,最终将所有线程的结果合并起来形成最终的数据结果。
三、实现方式1. 准备工作:首先需要准备好要处理的数据,并将其存储在内存中或临时文件中。
2. 创建线程池:使用线程池来管理多个线程,可以根据系统资源情况和数据处理需求来设置线程池的大小。
3. 分配任务:将数据读取和写入操作分解为多个子任务,并分配给线程池中的线程执行。
4. 合并结果:等待所有线程完成任务后,将各个线程的结果合并起来形成最终的数据结果,并写入目标文件中。
四、代码示例以下是一个简单的多线程读取数据写入文件的示例代码:```pythonimport threadingimport queueimport osdef read_data(data_queue, output_file):with open(output_file, 'wb') as f:while not data_queue.empty():data = data_queue.get()f.write(data)data_queue.task_done()def write_data(data_queue):while not data_queue.empty():data = data_queue.get()with open('temp.txt', 'ab') as f:f.write(data)data_queue.task_done()def multi_thread_process(input_file, output_file, thread_num):data_queue = queue.Queue()threads = []with open(input_file, 'rb') as f:for i in range(thread_num):t = threading.Thread(target=read_data, args=(data_queue, f))t.start()threads.append(t)for t in threads:t.join()with open('temp.txt', 'rb') as f:data = f.read()with open(output_file, 'wb') as f:f.write(data)f.close()os.remove('temp.txt')```上述代码中,`read_data`函数负责从输入文件中读取数据并写入到输出文件中,`write_data`函数负责将数据先写入到临时文件中,以便多个线程同时读取。
Java实现⼤批量数据导⼊导出(100W以上) -(⼆)导出使⽤POI或JXLS导出⼤数据量(百万级)Excel报表常常⾯临两个问题:1. 服务器内存溢出;2. ⼀次从数据库查询出这么⼤数据,查询缓慢。
当然也可以分页查询出数据,分别⽣成多个Excel打包下载,但这种⽣成还是很缓慢。
⼤数据量导⼊请参考:那么如何解决呢?我们可以借助XML格式利⽤模板替换,分页查询出数据从磁盘写⼊XML,最终会以Excel多sheet形式⽣成。
亲测2400万⾏数据,⽣成Excel⽂件4.5G,总耗时1.5分钟。
我利⽤StringTemplate模板解析技术对XML模板进⾏填充。
当然也可以使⽤FreeMarker, Velocity等Java模板技术实现。
⾸先引⼊StringTemplate所需Jar包:使⽤技术为 stringTemplatepom.xml:1 <dependency>2<groupId>antlr</groupId>3<artifactId>antlr</artifactId>4<version>2.7.7</version>5</dependency>67<dependency>8<groupId>org.antlr</groupId>9<artifactId>stringtemplate</artifactId>10<version>3.2.1</version>11</dependency>⾸先准备导出Excel模板,然后打开-》另存为-》选择格式为XML,然后⽤⽂本打开XML,提取XML头模板(head.st可通⽤),数据体模板(boday.st):head.st可通⽤:1<?xml version="1.0"?>2<?mso-application progid="Excel.Sheet"?>3<Workbook xmlns="urn:schemas-microsoft-com:office:spreadsheet"4 xmlns:o="urn:schemas-microsoft-com:office:office"5 xmlns:x="urn:schemas-microsoft-com:office:excel"6 xmlns:ss="urn:schemas-microsoft-com:office:spreadsheet"7 xmlns:html="/TR/REC-html40">8<DocumentProperties xmlns="urn:schemas-microsoft-com:office:office">9<Created>1996-12-17T01:32:42Z</Created>10<LastSaved>2013-08-02T09:21:24Z</LastSaved>11<Version>11.9999</Version>12</DocumentProperties>13<OfficeDocumentSettings xmlns="urn:schemas-microsoft-com:office:office">14<RemovePersonalInformation/>15</OfficeDocumentSettings>16<ExcelWorkbook xmlns="urn:schemas-microsoft-com:office:excel">17<WindowHeight>4530</WindowHeight>18<WindowWidth>8505</WindowWidth>19<WindowTopX>480</WindowTopX>20<WindowTopY>120</WindowTopY>21<AcceptLabelsInFormulas/>22<ProtectStructure>False</ProtectStructure>23<ProtectWindows>False</ProtectWindows>24</ExcelWorkbook>25<Styles>26<Style ss:ID="Default" ss:Name="Normal">27<Alignment ss:Vertical="Bottom"/>28<Borders/>29<Font ss:FontName="宋体" x:CharSet="134" ss:Size="12"/>30<Interior/>31<NumberFormat/>32<Protection/>33</Style>34</Styles>boday.st:1 $worksheet:{2<Worksheet ss:Name="$it.sheet$">3<Table ss:ExpandedColumnCount="$it.columnNum$" ss:ExpandedRowCount="$it.rowNum$" x:FullColumns="1"4 x:FullRows="1" ss:DefaultColumnWidth="54" ss:DefaultRowHeight="14.25">5 $it.rows:{6<Row>7<Cell><Data ss:Type="String">$1$</Data></Cell>8<Cell><Data ss:Type="String">$2$</Data></Cell>9<Cell><Data ss:Type="String">$3$</Data></Cell>10</Row>11 }$12</Table>13</Worksheet>14 }$⽣成⼤数据量Excel类:ExcelGenerator:1package test.exportexcel;23import org.antlr.stringtemplate.StringTemplate;4import org.antlr.stringtemplate.StringTemplateGroup;5import test.exportexcel.bean.Row;6import test.exportexcel.bean.Worksheet;78import java.io.*;9import java.util.ArrayList;10import java.util.List;11import java.util.Random;1213/**14 * 类功能描述:generator big data Excel15 *16 * @author WangXueXing create at 19-4-13 下午10:2317 * @version 1.0.018*/19public class ExcelGenerator {20public static void main(String[] args) throws FileNotFoundException{21 ExcelGenerator template = new ExcelGenerator();22 template.output2();23 }2425/**26 * ⽣成数据量⼤的时候,该⽅法会出现内存溢出27 * @throws FileNotFoundException28*/29public void output1() throws FileNotFoundException{30 StringTemplateGroup stGroup = new StringTemplateGroup("stringTemplate");31 StringTemplate st4 = stGroup.getInstanceOf("test/exportexcel/template/test");32 List<Worksheet> worksheets = new ArrayList<>();3334 File file = new File("/home/barry/data/output.xls");35 PrintWriter writer = new PrintWriter(new BufferedOutputStream(new FileOutputStream(file)));3637for(int i=0;i<30;i++){38 Worksheet worksheet = new Worksheet();39 worksheet.setSheet("第"+(i+1)+"页");40 List<Row> rows = new ArrayList<>();41for(int j=0;j<6000;j++){42 Row row = new Row();43 row.setName1("zhangzehao");44 row.setName2(""+j);45 row.setName3(i+" "+j);46 rows.add(row);47 }48 worksheet.setRows(rows);49 worksheets.add(worksheet);50 }5152 st4.setAttribute("worksheets", worksheets);53 writer.write(st4.toString());54 writer.flush();55 writer.close();56 System.out.println("⽣成excel完成");57 }5859/**60 * 该⽅法不管⽣成多⼤的数据量,都不会出现内存溢出,只是时间的长短61 * 经测试,⽣成2400万数据,2分钟内,4.5G⼤的⽂件,打开⼤⽂件就看内存是否⾜够⼤了62 * 数据量⼩的时候,推荐⽤JXLS的模板技术⽣成excel⽂件,谁⽤谁知道,⼤数据量可以结合该⽅法使⽤63 * @throws FileNotFoundException64*/65public void output2() throws FileNotFoundException{66long startTimne = System.currentTimeMillis();67 StringTemplateGroup stGroup = new StringTemplateGroup("stringTemplate");6869//写⼊excel⽂件头部信息70 StringTemplate head = stGroup.getInstanceOf("test/exportexcel/template/head");71 File file = new File("/home/barry/data/output.xls");72 PrintWriter writer = new PrintWriter(new BufferedOutputStream(new FileOutputStream(file)));73 writer.print(head.toString());74 writer.flush();7576int sheets = 400;77//excel单表最⼤⾏数是6553578int maxRowNum = 60000;7980//写⼊excel⽂件数据信息81for(int i=0;i<sheets;i++){82 StringTemplate body = stGroup.getInstanceOf("test/exportexcel/template/body");83 Worksheet worksheet = new Worksheet();84 worksheet.setSheet(" "+(i+1)+" ");85 worksheet.setColumnNum(3);86 worksheet.setRowNum(maxRowNum);87 List<Row> rows = new ArrayList<>();88for(int j=0;j<maxRowNum;j++){89 Row row = new Row();90 row.setName1(""+new Random().nextInt(100000));91 row.setName2(""+j);92 row.setName3(i+""+j);93 rows.add(row);94 }95 worksheet.setRows(rows);96 body.setAttribute("worksheet", worksheet);97 writer.print(body.toString());98 writer.flush();99 rows.clear();100 rows = null;101 worksheet = null;102 body = null;103 Runtime.getRuntime().gc();104 System.out.println("正在⽣成excel⽂件的 sheet"+(i+1));105 }106107//写⼊excel⽂件尾部108 writer.print("</Workbook>");109 writer.flush();110 writer.close();111 System.out.println("⽣成excel⽂件完成");112long endTime = System.currentTimeMillis();113 System.out.println("⽤时="+((endTime-startTimne)/1000)+"秒");114 }115 }定义JavaBean:WorkSheet.java:1package test.exportexcel.bean;23import java.util.List;45/**6 * 类功能描述:Excel sheet Bean7 *8 * @author WangXueXing create at 19-4-13 下午10:219 * @version 1.0.010*/11public class Worksheet {12private String sheet;13private int columnNum;14private int rowNum;15private List<Row> rows;1617public String getSheet() {18return sheet;19 }20public void setSheet(String sheet) {21this.sheet = sheet;22 }2324public List<Row> getRows() {25return rows;26 }27public void setRows(List<Row> rows) {28this.rows = rows;29 }3031public int getColumnNum() {32return columnNum;33 }34public void setColumnNum(int columnNum) {35this.columnNum = columnNum;36 }3738public int getRowNum() {39return rowNum;40 }41public void setRowNum(int rowNum) {42this.rowNum = rowNum;43 }44 }Row.java:1package test.exportexcel.bean;23/**4 * 类功能描述:Excel row bean5 *6 * @author WangXueXing create at 19-4-13 下午10:227 * @version 1.0.08*/9public class Row {10private String name1;11private String name2;12private String name3;1314public String getName1() {15return name1;16 }17public void setName1(String name1) {1 = name1;19 }2021public String getName2() {22return name2;23 }24public void setName2(String name2) {2 = name2;26 }2728public String getName3() {29return name3;30 }31public void setName3(String name3) {3 = name3;33 }34 }另附实现源码: 此外,⼤数据量并并且Excel列较多时,会出现内存溢出。
Java多线程读取⼤⽂件前⾔ 今天是五⼀假期第⼀天,按理应该是快乐玩耍的⽇⼦,但是作为⼀个北漂到京师的开发⼈员,实在难想出去那玩耍。
好玩的地⽅⽐较远,近处⼜感觉没意思。
于是乎,闲着写篇⽂章,总结下昨天写的程序吧。
昨天下午朋友跟我聊起,他说有个需求,需要把上G的txt⽂件读取写⼊到数据库。
⽤普通的io结果⾃然是OOM了,所以果断⽤NIO技术。
为了提⾼速度,⾃然还得⽤上多线程技术。
接下来就介绍⼀下实现思路以及相关的知识点。
内容 ⼀、对⽂件分区 为了充分利⽤多线程读取,就需要把⽂件划分成多个区域,供每个线程读取。
那么就需要有⼀个算法来计算出每个线程读取的开始位置和结束位置。
那么⾸先根据配置的线程数和⽂件的总长度计,算出每个线程平均分配的读取长度。
但是有⼀点,由于⽂件是纯⽂本⽂件,必须按⾏来处理,如果分割点在某⼀⾏中间,那么这⼀⾏数据就会被分成两部分,分别由两个线程同时处理,这种情况是不能出现的。
所以各个区域的结束点上的字符必须是换⾏符。
第⼀个区域的开始位置是0,结束位置⾸先设为(⽂件长度/线程数),如果结束点位置不是换⾏符,就只能加1,直到是换⾏符位置。
第⼀个区域的结束位置有了,⾃然我们就能求出第⼆个区域的开始位置了,同理根据上边算法求出第⼆个区域的结束位置,然后依次类推第三个、第四个...... 上边的算法中,第⼀个区域的结束位置定了,才能有第⼆个区域的开始位置,第⼆个区域的结束位置定了,才能有第三个区域的开始位置,依次这么下去。
照这种规律,⾃然地想到的是⽤递归来解决。
(详情看) ⼆、内存⽂件映射 简单说⼀下内存⽂件映射:内存⽂件映射,简单地说就是将⽂件映射到内存的某个地址上。
要理解内存⽂件映射,⾸先得明⽩普通⽅式读取⽂件的流程:⾸先内存空间分为内核空间和⽤户空间,在应⽤程序读取⽂件时,底层会发起系统调⽤,由系统调⽤将数据先读⼊到内核空间,然后再将数据拷贝到应⽤程序的⽤户空间供应⽤程序使⽤。
这个过程多了⼀个从内核空间到⽤户空间拷贝的过程。
使⽤java多线程分批处理数据⼯具类最近由于业务需要,数据量⽐较⼤,需要使⽤多线程来分批处理,提⾼处理效率和能⼒,于是就写了⼀个通⽤的多线程处理⼯具,只需要实现⾃⼰的业务逻辑就可以正常使⽤,现在记录⼀下主要是针对⼤数据量list,将list划分多个线程处理ResultBean类:返回结果统⼀beanpackage mon.model;import java.io.Serializable;import com.alibaba.fastjson.JSON;/*** 返回结果统⼀bean** ResultBean<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:49:46 <BR>* @version 2.0**/public class ResultBean<T> implements Serializable {private static final long serialVersionUID = 1L;// 成功状态public static final int SUCCESS = 1;// 处理中状态public static final int PROCESSING = 0;// 失败状态public static final int FAIL = -1;// 描述private String msg = "success";// 状态默认成功private int code = SUCCESS;// 备注private String remark;// 返回数据private T data;public ResultBean() {super();}public ResultBean(T data) {super();this.data = data;}/*** 使⽤异常创建结果*/public ResultBean(Throwable e) {super();this.msg = e.toString();this.code = FAIL;}/**** 实例化结果默认成功状态<BR>* ⽅法名:newInstance<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:51:26 <BR>* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance() {ResultBean<T> instance = new ResultBean<T>();//默认返回信息instance.code = SUCCESS;/**** 实例化结果默认成功状态和数据<BR>* ⽅法名:newInstance<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年5⽉10⽇-下午2:13:16 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(T data) {ResultBean<T> instance = new ResultBean<T>();//默认返回信息instance.code = SUCCESS;instance.msg = "success";instance.data = data;return instance;}/**** 实例化返回结果<BR>* ⽅法名:newInstance<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午4:00:53 <BR>* @param code* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg) {ResultBean<T> instance = new ResultBean<T>();//默认返回信息instance.code = code;instance.msg = msg;return instance;}/**** 实例化返回结果<BR>* ⽅法名:newInstance<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午4:00:35 <BR>* @param code* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public static <T> ResultBean<T> newInstance(int code, String msg, T data) { ResultBean<T> instance = new ResultBean<T>();//默认返回信息instance.code = code;instance.msg = msg;instance.data = data;return instance;}/**** 设置返回数据<BR>* ⽅法名:setData<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:52:01 <BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setData(T data){this.data = data;return this;}/**** 设置结果描述<BR>* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setMsg(String msg){this.msg = msg;return this;}/**** 设置状态<BR>* ⽅法名:setCode<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午4:17:56 <BR>* @param code* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setCode(int code){this.code = code;return this;}/**** 设置备注)<BR>* ⽅法名:setRemark<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午5:47:29 <BR>* @param remark* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> setRemark(String remark){this.remark = remark;return this;}/**** 设置成功描述和返回数据<BR>* ⽅法名:success<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:52:58 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg, T data){ this.code = SUCCESS;this.data = data;this.msg = msg;return this;}/**** 设置成功返回结果描述<BR>* ⽅法名:success<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> success(String msg){this.code = SUCCESS;this.msg = msg;return this;}/**** 设置处理中描述和返回数据<BR>* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING;this.data = data;this.msg = msg;return this;}/**** 设置处理中返回结果描述<BR>* ⽅法名:success<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:53:31 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> processing(String msg){this.code = PROCESSING;this.msg = msg;return this;}/**** 设置失败返回描述和返回数据<BR>* ⽅法名:fail<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:54:04 <BR>* @param msg* @param data* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg, T data){this.code = FAIL;this.data = data;this.msg = msg;return this;}/**** 设置失败返回描述<BR>* ⽅法名:fail<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年4⽉12⽇-下午3:54:32 <BR>* @param msg* @return ResultBean<T><BR>* @exception <BR>* @since 2.0*/public ResultBean<T> fail(String msg){this.code = FAIL;this.msg = msg;return this;}public T getData() {return data;}public String getMsg() {return msg;}public int getCode() {return code;}public String getRemark() {return remark;}/**** 时间:2018年4⽉12⽇-下午4:42:28 <BR>* @return String<BR>* @exception <BR>* @since 2.0*/public String json(){return JSON.toJSONString(this);}}View CodeITask接⼝:实现⾃⼰的业务package mon.multi.execute;import java.util.Map;/*** 任务处理接⼝* 具体业务逻辑可实现该接⼝* T 返回值类型* E 传⼊值类型* ITask<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉4⽇-下午6:12:32 <BR>* @version 2.0**/public interface ITask<T, E> {/**** 任务执⾏⽅法接⼝<BR>* ⽅法名:execute<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉4⽇-下午6:13:44 <BR>* @param e 传⼊对象* @param params 其他辅助参数* @return T<BR> 返回值类型* @exception <BR>* @since 2.0*/T execute(E e, Map<String, Object> params);}View CodeHandleCallable类:实现Callable接⼝,来处理任务package mon.multi.execute;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.concurrent.Callable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import mon.model.ResultBean;/***** HandleCallable<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉4⽇-上午11:55:41 <BR>** @version 2.0**/@SuppressWarnings("rawtypes")public class HandleCallable<E> implements Callable<ResultBean> {private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 线程名称private String threadName = "";private Map<String, Object> params;// 具体执⾏任务private ITask<ResultBean<String>, E> task;public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) {this.threadName = threadName;this.data = data;this.params = params;this.task = task;}@Overridepublic ResultBean<List<ResultBean<String>>> call() throws Exception {// 该线程中所有数据处理返回结果ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();if (data != null && data.size() > 0) {("线程:{},共处理:{}个数据,开始处理......", threadName, data.size());// 返回结果集List<ResultBean<String>> resultList = new ArrayList<>();// 循环处理每个数据for (int i = 0; i < data.size(); i++) {// 需要执⾏的数据E e = data.get(i);// 将数据执⾏结果加⼊到结果集中resultList.add(task.execute(e, params));("线程:{},第{}个数据,处理完成", threadName, (i + 1));}("线程:{},共处理:{}个数据,处理完成......", threadName, data.size()); resultBean.setData(resultList);}return resultBean;}}View CodeMultiThreadUtils类: 多线程⼯具类package mon.multi.execute;import java.util.ArrayList;import java.util.List;import java.util.Map;import pletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import mon.model.ResultBean;/***** MultiThreadUtils<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉8⽇-下午8:20:42 <BR>* @version 2.0**/public class MultiThreadUtils<T> {private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);// 线程个数,如不赋值,默认为5private int threadCount = 5;// 具体业务任务private ITask<ResultBean<String>, T> task;// 线程池管理器private CompletionService<ResultBean> pool = null;/**** 初始化线程池和线程个数<BR>* ⽅法名:newInstance<BR>* @return MultiThreadUtils<BR>* @exception <BR>* @since 2.0*/public static MultiThreadUtils newInstance(int threadCount) {MultiThreadUtils instance = new MultiThreadUtils();threadCount = threadCount;instance.setThreadCount(threadCount);return instance;}/**** 多线程分批执⾏list中的任务<BR>* ⽅法名:execute<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉8⽇-下午8:22:31 <BR>* @param data 线程处理的⼤数据量list* @param params 处理数据是辅助参数传递* @param task 具体执⾏业务的任务接⼝* @return ResultBean<BR>* @exception <BR>* @since 2.0*/@SuppressWarnings("rawtypes")public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 创建线程池ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);// 根据线程池初始化线程池管理器pool = new ExecutorCompletionService<ResultBean>(threadpool);// 开始时间(ms)long l = System.currentTimeMillis();// 数据量⼤⼩int length = data.size();// 每个线程处理的数据个数int taskCount = length / threadCount;// 划分每个线程调⽤的数据for (int i = 0; i < threadCount; i++) {// 每个线程任务数据listList<T> subData = null;if (i == (threadCount - 1)) {subData = data.subList(i * taskCount, length);} else {subData = data.subList(i * taskCount, (i + 1) * taskCount);}// 将数据分配给各个线程HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);// 将线程加⼊到线程池pool.submit(execute);}// 总的返回结果集List<ResultBean<String>> result = new ArrayList<>();for (int i = 0; i < threadCount; i++) {// 每个线程处理结果集ResultBean<List<ResultBean<String>>> threadResult;try {threadResult = pool.take().get();result.addAll(threadResult.getData());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 关闭线程池threadpool.shutdownNow();// 执⾏结束时间long end_l = System.currentTimeMillis();("总耗时:{}ms", (end_l - l));return ResultBean.newInstance().setData(result);}public int getThreadCount() {return threadCount;}public void setThreadCount(int threadCount) {this.threadCount = threadCount;View Code测试类TestTaskpackage mon.multi.execute;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import mon.model.ResultBean;/**** 具体执⾏业务任务需要实现ITask接⼝在execute中重写业务逻辑* TestTask<BR>* 创建⼈:wangbeidou <BR>* 时间:2018年8⽉8⽇-下午8:40:32 <BR>* @version 2.0**/public class TestTask implements ITask<ResultBean<String>, Integer> {@Overridepublic ResultBean execute(Integer e, Map<String, Object> params) {/*** 具体业务逻辑:将list中的元素加上辅助参数中的数据返回*/int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));e = e + addNum;ResultBean<String> resultBean = ResultBean.newInstance();resultBean.setData(e.toString());return resultBean;}public static void main(String[] args) {// 需要多线程处理的⼤量数据listList<Integer> data = new ArrayList<>(10000);for(int i = 0; i < 10000; i ++){data.add(i + 1);}// 创建多线程处理任务MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);ITask<ResultBean<String>, Integer> task = new TestTask();// 辅助参数加数Map<String, Object> params = new HashMap<>();params.put("addNum", 4);// 执⾏多线程处理,并返回处理结果ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task); }}。
Java多个线程从队列中取数据的方法在并发编程中,多线程从队列中取数据是一个常见的需求。
Java提供了多种方式来实现多个线程从队列中取数据的方法,本文将介绍其中的几种常用方法,并对每种方法进行详细的解析。
方法一:使用synchronized关键字public class Queue {private List<Integer> queue = new ArrayList<>();public synchronized void enqueue(Integer item) {queue.add(item);}public synchronized Integer dequeue() {if (queue.isEmpty()) {return null;}return queue.remove(0);}}在这个方法中,我们使用了synchronized关键字来实现线程安全。
通过在enqueue()和dequeue()方法上加上synchronized关键字,我们确保了在同一时刻只能有一个线程访问队列。
这种方式简单易懂,但是在高并发场景下性能较低。
方法二:使用ReentrantLockpublic class Queue {private List<Integer> queue = new ArrayList<>();private ReentrantLock lock = new ReentrantLock();public void enqueue(Integer item) {lock.lock();try {queue.add(item);} finally {lock.unlock();}}public Integer dequeue() {lock.lock();try {if (queue.isEmpty()) {return null;}return queue.remove(0);} finally {lock.unlock();}}}这种方法使用了ReentrantLock来实现线程安全。
java多线程向数据库写⼊数据任务: 从sqlserver中将⼀个表A(约16W条数据)导到mysql中对应的⼀个表B中。
思路:分段获取A表中的数据后,⽤多个线程同时向B表中写⼊。
关键代码//将数据库中的数据条数分段public void division(){//获取要导⼊的总的数据条数String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]";try {pss=cons.prepareStatement(sql3);rss=pss.executeQuery();while(rss.next()){System.out.println("总记录条数:"+rss.getInt(1));sum=rss.getInt(1);}//每30000条记录作为⼀个分割点if(sum>=30000){n=sum/30000;residue=sum%30000;}else{residue=sum;}System.out.println(n+" "+residue);} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}线程类public MyThread(int start,int end) {this.end=end;this.start=start;System.out.println("处理掉余数");try {System.out.println("--------"+Thread.currentThread().getName()+"------------");Class.forName(SQLSERVERDRIVER);System.out.println("加载sqlserver驱动...");cons = DriverManager.getConnection(CONTENTS,UNS,UPS);stas = cons.createStatement();System.out.println("连接SQLServer数据库成功!!");System.out.println("加载mysql驱动.....");Class.forName(MYSQLDRIVER);con = DriverManager.getConnection(CONTENT,UN,UP);sta = con.createStatement();// 关闭事务⾃动提交con.setAutoCommit(false);System.out.println("连接mysql数据库成功!!");} catch (Exception e) {e.printStackTrace();}// TODO Auto-generated constructor stub}public ArrayList<Member> getAll(){Member member;String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" + " from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end;try {System.out.println("正在获取数据...");allmembers=new ArrayList();rss=stas.executeQuery(sql1);while(rss.next()){member=new Member();member.setAddress1(rss.getString("address1"));member.setBnpoints(rss.getString("bnpoints"));member.setDbno(rss.getString("dbno"));member.setExpiry(rss.getString("expiry"));member.setHispoints(rss.getString("hispoints"));member.setKypoints(rss.getString("kypoints"));member.setLevels(rss.getString("levels"));member.setNames(rss.getString("names"));member.setPmcode(rss.getString("pmcode"));member.setRemark(rss.getString("remark"));member.setSex(rss.getString("sex"));member.setTelephone(rss.getString("telephone"));member.setWxno(rss.getString("wxno"));member.setPmdate(rss.getString("pmdate"));allmembers.add(member);// System.out.println(member.getNames());}System.out.println("成功获取sqlserver数据库数据!");return allmembers;} catch (SQLException e) {// TODO Auto-generated catch blockSystem.out.println("获取sqlserver数据库数据发送异常!");e.printStackTrace();}try {rss.close();stas.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}return null;}public void inputAll(ArrayList<Member> allmembers){System.out.println("开始向mysql中写⼊");String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";try {ps=con.prepareStatement(sql2);System.out.println("-------------------------等待写⼊数据条数: "+allmembers.size());for(int i=0;i<allmembers.size();i++){ps.setString(1, allmembers.get(i).getPmcode());ps.setString(2, allmembers.get(i).getNames());//System.out.println(allmembers.get(i).getNames());ps.setString(3, allmembers.get(i).getSex());ps.setString(4, allmembers.get(i).getTelephone());ps.setString(5, allmembers.get(i).getAddress1());ps.setString(6, allmembers.get(i).getPmdate());ps.setString(7, allmembers.get(i).getExpiry());ps.setString(8, allmembers.get(i).getLevels());ps.setString(9, allmembers.get(i).getDbno());ps.setString(10, allmembers.get(i).getHispoints());ps.setString(11, allmembers.get(i).getBnpoints());ps.setString(12, allmembers.get(i).getKypoints());ps.setString(13, allmembers.get(i).getWxno());ps.setString(14, allmembers.get(i).getRemark());//插⼊命令列表//ps.addBatch();ps.executeUpdate();}//ps.executeBatch();mit();ps.close();con.close();this.flag=false;System.out.println(Thread.currentThread().getName()+"--->OK"); } catch (SQLException e) {// TODO Auto-generated catch blockSystem.out.println("向mysql中更新数据时发⽣异常!");e.printStackTrace();}}@Overridepublic void run() {// TODO Auto-generated method stubwhile(true&&flag){this.inputAll(getAll());}}测试类:public class Test1 {DbManager dm=null;MyThread my1=null;public Test1(){dm=new DbManager();System.out.println(dm.n+"----"+dm.residue);if(dm.n<1){//数据条数⼩于30000单线程处理my1=new MyThread(1,dm.sum);my1.start=1;my1.end=dm.residue;Thread t1=new Thread(my1);t1.start();}else{//⼤于30000时//起n个线程每个处理30000条数据for (int i = 1; i <=dm.n; i++) {new Thread(new MyThread(i)).start();try {Thread.sleep(1);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}//处理掉余数my1=new MyThread(dm.n*30000+1,dm.sum);Thread t1=new Thread(my1);t1.start();}}public static void main(String[] args) {//new Test1();//迁移完数据,⾃动关机try {Runtime.getRuntime().exec("cmd /c Shutdown -t 10");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}经过多次测试:从sqlserver中读取16w条数据并写⼊mysql,耗时15min左右。
JAVA中用多线程技术实现大数据导入朋友让我帮忙写个程序从文本文档中导入数据到oracle 数据库中,技术上没有什么难度,文档的格式都是固定的只要对应数据库中的字段解析就行了,关键在于性能。
数据量很大百万条记录,因此考虑到要用多线程并发执行,在写的过程中又遇到问题,我想统计所有子进程执行完毕总共的耗时,在第一个子进程创建前记录当前时间用System.currentTimeMillis()在最后一个子进程结束后记录当前时间,两次一减得到的时间差即为总共的用时,代码如下Java代码long tStart = System.currentTimeMillis();System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程Runnable r = new Runnable(){@Overridepublic void run(){System.out.println(Thread.currentThread().getName() + "开始");//做一些事情... ...System.out.println(Thread.currentThread().getName() + "结束.");}}Thread t = new Thread(r);t.start();}System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记long tEnd = System.currentTimeMillis();System.out.println("总共用时:"+ (tEnd - tStart) + "millions");long tStart = System.currentTimeMillis();System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程Runnable r = new Runnable(){@Overridepublic void run(){System.out.println(Thread.currentThread().getName() + "开始");//做一些事情... ...System.out.println(Thread.currentThread().getName() + "结束.");}}Thread t = new Thread(r);t.start();}System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记long tEnd = System.currentTimeMillis();System.out.println("总共用时:"+ (tEnd - tStart) + "millions");结果是几乎在for循环结束的瞬间就执行了主线程打印总共用时的语句,原因是所有的子线程是并发执行的,它们运行时主线程也在运行,这就引出了一个问题即本文标题如何"让主线程等待所有子线程执行完毕"。
试过在每个子线程开始后加上t.join(),结果是所有线程都顺序执行,这就失去了并发的意义了,显然不是我想要的。
网上Google了很久也没有找到解决方案,难道就没有人遇到过这种需求吗?还是这个问题太简单了?无耐只得自己想办法了...最后我的解决办法是,自定义一个ImportThread类继承自ng.Thread,重载run()方法,用一个List属性保存所有产生的线程,这样只要判断这个List是否为空就知道还有没有子线程没有执行完了,类代码如下:Java代码public class ImportThread extends Thread {private static List<Thread> runningThreads = new ArrayList<Thread>();public ImportThread() {}@Overridepublic void run() {regist(this);//线程开始时注册System.out.println(Thread.currentThread().getName() + "开始...");//打印开始标记//做一些事情... ...unRegist(this);//线程结束时取消注册System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记}public void regist(Thread t){synchronized(runningThreads){runningThreads.add(t);}}public void unRegist(Thread t){synchronized(runningThreads){runningThreads.remove(t);}}public static boolean hasThreadRunning() {return (runningThreads.size() > 0);//通过判断runningThreads是否为空就能知道是否还有线程未执行完}}public class ImportThread extends Thread {private static List<Thread> runningThreads = newArrayList<Thread>();public ImportThread() {}@Overridepublic void run() {regist(this);//线程开始时注册System.out.println(Thread.currentThread().getName() + "开始...");//打印开始标记//做一些事情... ...unRegist(this);//线程结束时取消注册System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记}public void regist(Thread t){synchronized(runningThreads){runningThreads.add(t);}}public void unRegist(Thread t){synchronized(runningThreads){runningThreads.remove(t);}public static boolean hasThreadRunning() {return (runningThreads.size() > 0);//通过判断runningThreads是否为空就能知道是否还有线程未执行完}}主线程中代码:Java代码long tStart =System.currentTimeMillis();System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程Thread t = new ImportThread();t.start();}while(true){//等待所有子线程执行完if(!ImportThread.hasThreadRunning()){break;}Thread.sleep(500);System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记long tEnd = System.currentTimeMillis();System.out.println("总共用时:"+ (tEnd - tStart) + "millions");long tStart = System.currentTimeMillis();System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程Thread t = new ImportThread();t.start();}while(true){//等待所有子线程执行完if(!ImportThread.hasThreadRunning()){break;}Thread.sleep(500);}System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记long tEnd = System.currentTimeMillis();System.out.println("总共用时:"+ (tEnd - tStart) + "millions");打印的结果是:main开始Thread-1开始...Thread-5开始...Thread-0开始...Thread-2开始...Thread-3开始...Thread-4开始...Thread-5结束.Thread-4结束.Thread-2结束.Thread-0结束.Thread-3结束.Thread-1结束.main结束.总共用时:20860millions可以看到main线程是等所有子线程全部执行完后才开始执行的。