两个线程进行数据交换的Exchanger

简介

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。

Exchanger类提供了两个方法,String exchange(V x):用于交换,启动交换并等待另一个线程调用exchange;String exchange(V x,long timeout,TimeUnit unit):用于交换,启动交换并等待另一个线程调用exchange,并且设置最大等待时间,当等待时间超过timeout便停止等待。

Exchanger的应用场景

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。代码如下:

package cn.iigrowing.threads.study.Exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
 private static final Exchanger<String> exgr = new Exchanger<String>();

 private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

 public static void main(String[] args) {
 threadPool.execute(new Runnable() {
 // @Override
 public void run() {
 try {
 String A = "银行流水A";// A录入银行流水数据
 String B = exgr.exchange(A);
 System.out.println("Im in a thread A:"
 + Thread.currentThread().getId() + " B is:" + B);
 } catch (InterruptedException e) {
 }
 }
 });
 threadPool.execute(new Runnable() {
 // @Override
 public void run() {
 try {
 String B = "银行流水B";// B录入银行流水数据
 // String A = exgr.exchange("B");
 String A = exgr.exchange(B);

 System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
 + A + ",B录入是:" + B);

 System.out.println("Im in a thread B"
 + Thread.currentThread().getId() + " A is:" + A);

 } catch (InterruptedException e) {
 }
 }
 });
 threadPool.shutdown();
 }
}
执行结果

其他方法

如果两个线程有一个没有到达exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。

 

2.实例讲解

通过以上的原理,可以知道使用Exchanger类的核心便是exchange()方法的使用,接下来通过一个例子来使的该工具类的用途更加清晰。该例子主要讲解的是前段时间NBA交易截止日的交易。

package cn.iigrowing.threads.study.Exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data1 = "克拉克森,小拉里南斯";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
executor.execute(new Runnable() {
String data1 = "格里芬";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
executor.execute(new Runnable() {
String data1 = "哈里斯";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
executor.execute(new Runnable() {
String data1 = "以赛亚托马斯,弗莱";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
executor.shutdown();
}

private static void nbaTrade(String data1, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "在交易截止之前把 "
+ data1 + " 交易出去");
Thread.sleep((long) (Math.random() * 1000));
String data2 = (String) exchanger.exchange(data1);
System.out.println(Thread.currentThread().getName() + "交易得到"
+ data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

 

运行程序,得到如下结果:

pool-1-thread-1在交易截止之前把 克拉克森,小拉里南斯 交易出去
pool-1-thread-2在交易截止之前把 格里芬 交易出去
pool-1-thread-3在交易截止之前把 哈里斯 交易出去
pool-1-thread-4在交易截止之前把 以赛亚托马斯,弗莱 交易出去
pool-1-thread-1交易得到以赛亚托马斯,弗莱
pool-1-thread-4交易得到克拉克森,小拉里南斯
pool-1-thread-2交易得到哈里斯
pool-1-thread-3交易得到格里芬

以上例子可以看出两个都调用exchange()方法的线程会进行交换数据。接下来假设线程数目只有奇数个,观察情况:

如以下代码,将第四个线程注释掉。

package cn.iigrowing.threads.study.Exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.*;

public class ExchangerDemo2 {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data1 = "克拉克森,小拉里南斯";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
executor.execute(new Runnable() {
String data1 = "格里芬";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});

executor.execute(new Runnable() {
String data1 = "哈里斯";

// @Override
public void run() {
nbaTrade(data1, exchanger);
}
});
// executor.execute(new Runnable() {
// String data1 = "以赛亚托马斯,弗莱";
//
// @Override
// public void run() {
// nbaTrade(data1, exchanger);
// }
// });
executor.shutdown();
}

private static void nbaTrade(String data1, Exchanger exchanger) {

try {
System.out.println(Thread.currentThread().getName() + "在交易截止之前把 "
+ data1 + " 交易出去");
Thread.sleep((long) (Math.random() * 1000));
String data2 = (String) exchanger.exchange(data1);
System.out.println(Thread.currentThread().getName() + "交易得到"
+ data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

 

运行程序,得到如下结果:

pool-1-thread-1在交易截止之前把 克拉克森,小拉里南斯 交易出去
pool-1-thread-2在交易截止之前把 格里芬 交易出去
pool-1-thread-3在交易截止之前把 哈里斯 交易出去
pool-1-thread-3交易得到克拉克森,小拉里南斯
pool-1-thread-1交易得到哈里斯

由结果可知,线程2和线程3进行了交换数据,而线程1一直等待与它交换数据的线程调用exchange,但是只有3个线程,所以会一直等待。

因此,当两个线程之间出现数据交换的情况,可以使用Exchanger工具类实现数据交换。注意exchange方法的含义,以及触发数据交换的条件。