JAVA并发编程(四)-并发计算的基本方法

Scroll Down

JAVA并发编程(四)-并发计算的基本方法

挖掘可并发点

并发计算首先需要找到程序中哪些点可以并发化,即由串行改为并发的,这些可并发化的处理被称为可并发点。

分而治之

基于数据的分割实现并发

例如从100万个对象中提取出符合条件的数量,如果单独用for循环的话,速度会特别慢,那我们基于数据的分隔,可以将数据按照一定的规则(均分)分解给若干个规模较小的子数据,并使用子线程来对这些输入进行处理
image-20200325003003136

基于数据分割的结果实质上是产生多个相同的同质工作者线程,任务处理逻辑都是相同的。

一般我们需要考虑以下几个问题:

  • 线程数量设置是否合理?线程数量的增多会增加上下文切换的消耗
  • 如果某个线程处理时出现异常是否需要停止其他线程
  • 结果集的合并,如果是文件或者字节流需要按照相同的原始的顺序写入

基于任务的分隔实现并发

将原始的任务切割为多个子任务再进行处理

image-20200325003811476

例如我们平时一个档案需要分发给多个系统,我们可以将其分割成每个系统一个线程去分发,不需要先发给A,在发给B,再发给C。

如何设置合适的线程数

正常来说是否要使用多线程,主要取决于并发化的代码处理逻辑所耗时间占总时间的多少,如果占比过少,则没有使用并发的必要。

线程设置的过少无法充分的利用处理器的资源,线程数设置的过大又可能导致频繁的上下文切换反而降低了系统的性能,所以要设置一个大致符合的线程数,经过网上一些文章分析认为线程设置的数量在CPU数目-2倍CPU数目之间,cpu数目的数值获取可以使用来获取

Runtime.getRuntime().availableProcessors()

对于一些I/O密集的操作,因为I/O操作时会导致上下文切换,所以优先将线程数目设置为1,减少线程上下文切换,不过I/O线程在等待I/O操作结果的时候是不占用处理器资源的,可以增加线程,最大尽量小于2倍CPU数目

测试代码

package com.concurrency.demo;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author : zhengyao3@郑瑶
 * @date : 2020/3/24 23:35
 * @Description: 100W个ArrayList的遍历
 */
public class ArrayList100W {
    private static int LIST_LENGTH = 1000000;
    //线程数量
    private static int THREAD_NUMBER = 100;
    //每个线程读取的list个数
    private static int SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;

    public static void main(String[] args) throws InterruptedException {
        ArrayList<MyObject> myObjectArrayList = new ArrayList<>(LIST_LENGTH);
        for (int i = 0; i < LIST_LENGTH; i++) {
            myObjectArrayList.add(new MyObject());
        }

        //第一种方法,直接遍历
        long start = System.currentTimeMillis();
        int numberEquals2 = 0;
        for (int i = 0; i < LIST_LENGTH; i++) {
            if (myObjectArrayList.get(i).type == 2) {
                numberEquals2 ++;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("线程数量1,线性遍历,花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+numberEquals2);

        //第二种方法,用100个线程来分别跑,用来计数的变量是原子变量
        start = System.currentTimeMillis();
        AtomicInteger atomicNumberEquals2 = new AtomicInteger(0);
        ExecutorService pool = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH ); j++) {
                    if (myObjectArrayList.get(j).type == 2) {
                        atomicNumberEquals2.addAndGet(1);
                    }
                }
            });
        }
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.DAYS);
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(原子变量)花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+atomicNumberEquals2.get());

        //第三种方法,用100个线程来分别跑,使用数组来计数
        start = System.currentTimeMillis();
        int result[] = new int[THREAD_NUMBER];
        ExecutorService pool2 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool2.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).type == 2) {
                        result[threadNumber]++;
                    }
                }
            });
        }
        pool2.shutdown();
        pool2.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(数组存储)花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+numberEquals2);

        //第四种方法,获取本机CPU核心数*2,设置为线程数量
        THREAD_NUMBER =  Runtime.getRuntime().availableProcessors()*2;
        SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;
        start = System.currentTimeMillis();
        int[] result2 = new int[THREAD_NUMBER];
        ExecutorService pool3 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool3.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).type == 2) {
                        result2[threadNumber]++;
                    }
                }
            });
        }
        pool3.shutdown();
        pool3.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result2[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(CPU核心*2)花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+numberEquals2);
        //第五种方法,获取本机CPU核心数 + 1,设置为线程数量
        THREAD_NUMBER =  Runtime.getRuntime().availableProcessors() + 1;
        SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;
        start = System.currentTimeMillis();
        int[] result3 = new int[THREAD_NUMBER];
        ExecutorService pool4 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool4.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).type == 2) {
                        result3[threadNumber]++;
                    }
                }
            });
        }
        pool4.shutdown();
        pool4.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result3[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(CPU核心+1)花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+numberEquals2);
        start = System.currentTimeMillis();
        //第六种方法,使用parallelStream
        long count = myObjectArrayList.parallelStream().filter(item -> item.type == 2).count();
        end = System.currentTimeMillis();
        System.out.println("使用parallelStream花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+count);
        main2(myObjectArrayList);
    }
    private static void main2(ArrayList<MyObject> myObjectArrayList) throws InterruptedException {
        System.out.println("复杂方法分割线----------------------------------------");
        THREAD_NUMBER = 100;
        SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;
        //第一种方法,直接遍历
        long start = System.currentTimeMillis();
        int numberEquals2 = 0;
        for (int i = 0; i < LIST_LENGTH; i++) {
            if (myObjectArrayList.get(i).judge()) {
                numberEquals2 ++;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("线程数量1,线性遍历,花费的时间:"+(end-start)+" milliseconds, "+"符合条件的的个数有:"+numberEquals2);

        //第二种方法,用100个线程来分别跑,用来计数的变量是原子变量
        start = System.currentTimeMillis();
        AtomicInteger atomicNumberEquals2 = new AtomicInteger(0);
        ExecutorService pool = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH ); j++) {
                    if (myObjectArrayList.get(j).judge()) {
                        atomicNumberEquals2.addAndGet(1);
                    }
                }
            });
        }
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.DAYS);
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(原子变量)花费的时间:"+(end-start)+" milliseconds, "+"符合条件的的个数有:"+atomicNumberEquals2.get());

        //第三种方法,用100个线程来分别跑,使用数组来计数
        start = System.currentTimeMillis();
        int result[] = new int[THREAD_NUMBER];
        ExecutorService pool2 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool2.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).judge()) {
                        result[threadNumber]++;
                    }
                }
            });
        }
        pool2.shutdown();
        pool2.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(数组存储)花费的时间:"+(end-start)+" milliseconds, "+"符合条件的的个数有:"+numberEquals2);

        //第四种方法,获取本机CPU核心数*2,设置为线程数量
        THREAD_NUMBER =  Runtime.getRuntime().availableProcessors()*2;
        SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;
        start = System.currentTimeMillis();
        int[] result2 = new int[THREAD_NUMBER];
        ExecutorService pool3 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool3.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).judge()) {
                        result2[threadNumber]++;
                    }
                }
            });
        }
        pool3.shutdown();
        pool3.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result2[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(CPU核心*2)花费的时间:"+(end-start)+" milliseconds, "+"符合条件的的个数有:"+numberEquals2);
        start = System.currentTimeMillis();
        //第五种方法,获取本机CPU核心数 + 1,设置为线程数量
        THREAD_NUMBER =  Runtime.getRuntime().availableProcessors() + 1;
        SLICE_LENGTH = LIST_LENGTH/THREAD_NUMBER;
        start = System.currentTimeMillis();
        int[] result3 = new int[THREAD_NUMBER];
        ExecutorService pool4 = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            final int threadNumber = i;
            pool4.execute(() -> {
                for (int j = threadNumber*SLICE_LENGTH; j < ((threadNumber+1)*SLICE_LENGTH); j++) {
                    if (myObjectArrayList.get(j).judge()) {
                        result3[threadNumber]++;
                    }
                }
            });
        }
        pool4.shutdown();
        pool4.awaitTermination(1, TimeUnit.DAYS);
        numberEquals2 = 0;
        for (int i = 0; i < THREAD_NUMBER; i++) {
            numberEquals2 += result3[i];
        }
        end = System.currentTimeMillis();
        System.out.println("线程数量:"+THREAD_NUMBER+"(CPU核心+1)花费的时间:"+(end-start)+" milliseconds, "+"type等于2的个数有:"+numberEquals2);
        start = System.currentTimeMillis();
        //第六种方法,使用parallelStream
        long count = myObjectArrayList.parallelStream().filter(MyObject::judge).count();
        end = System.currentTimeMillis();
        System.out.println("使用parallelStream花费的时间:"+(end-start)+" milliseconds, "+"符合条件的的个数有:"+count);
    }
}

class MyObject{
    int type;
    MyObject(){
        type = new Random().nextInt(100);
    }
    public boolean judge(){
        double count = type;
        for (int i = 0; i < 100; i++) {
            count += Math.sin(count * i);
        }
        return count < 0.1;
    }
}

结果为:

线程数量1,线性遍历,花费的时间:9 milliseconds, type等于2的个数有:10057
线程数量:100(原子变量)花费的时间:110 milliseconds, type等于2的个数有:10057
线程数量:100(数组存储)花费的时间:45 milliseconds, type等于2的个数有:10057
线程数量:24(CPU核心*2)花费的时间:35 milliseconds, type等于2的个数有:10057
线程数量:13(CPU核心+1)花费的时间:33 milliseconds, type等于2的个数有:10057
使用parallelStream花费的时间:33 milliseconds, type等于2的个数有:10057
复杂方法分割线----------------------------------------
线程数量1,线性遍历,花费的时间:6837 milliseconds, 符合条件的的个数有:40189
线程数量:100(原子变量)花费的时间:759 milliseconds, 符合条件的的个数有:40189
线程数量:100(数组存储)花费的时间:731 milliseconds, 符合条件的的个数有:40189
线程数量:24(CPU核心*2)花费的时间:920 milliseconds, 符合条件的的个数有:40189
线程数量:13(CPU核心+1)花费的时间:750 milliseconds, type等于2的个数有:40189
使用parallelStream花费的时间:767 milliseconds, 符合条件的的个数有:40189