Flink定制输入输出源

本地输入输出

代码

package com.abeffect.blink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);

// execute and print result
counts.print();

}

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}

package com.abeffect.blink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);

// execute and print result
counts.print();

}

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
kafka 输入, stdout 输出
代码
输出类 StdoutSink.java
package com.abeffect.blink;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class StdoutSink extends
RichSinkFunction<Tuple1<String>> {

@Override
public void invoke(Tuple1<String> value) throws Exception {
System.out.println(value.f0);
}
}

package com.abeffect.blink;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class StdoutSink extends
RichSinkFunction<Tuple1<String>> {

@Override
public void invoke(Tuple1<String> value) throws Exception {
System.out.println(value.f0);
}
}
执行类 KafkaCount.java
package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple1<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> map(String value)
throws Exception {
String[] args = value.split(":");
return new Tuple1<String>(args[0]);
}
});

sourceStreamTra.addSink(new StdoutSink());
env.execute("data to stdout start");
}
}
package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple1<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> map(String value)
throws Exception {
String[] args = value.split(":");
return new Tuple1<String>(args[0]);
}
});

sourceStreamTra.addSink(new StdoutSink());
env.execute("data to stdout start");
}
}

kafka 输入测试

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test
结果查看

$ tailf flink-abeffect-jobmanager-0-fox.local.out
3
1
2
3
11
12
13

$ tailf flink-abeffect-jobmanager-0-fox.local.out
3
1
2
3
11
12
13
kafka 输入, mysql 输出
代码
输出类 MySQLSink.java
package com.abeffect.blink;

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySQLSink extends
RichSinkFunction<Tuple1<String>> {

private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
String username = "root";
String password = "toor";
String drivername = "com.mysql.jdbc.Driver";
String dburl = "jdbc:mysql://localhost:3306/blink_test";

@Override
public void invoke(Tuple1<String> value) throws Exception {
Class.forName(drivername);
connection = DriverManager.getConnection(dburl, username, password);
String sql = "insert into sink0 (`key`) values (?)";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, value.f0);
preparedStatement.executeUpdate();
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
}

package com.abeffect.blink;

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySQLSink extends
RichSinkFunction<Tuple1<String>> {

private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
String username = "root";
String password = "toor";
String drivername = "com.mysql.jdbc.Driver";
String dburl = "jdbc:mysql://localhost:3306/blink_test";

@Override
public void invoke(Tuple1<String> value) throws Exception {
Class.forName(drivername);
connection = DriverManager.getConnection(dburl, username, password);
String sql = "insert into sink0 (`key`) values (?)";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, value.f0);
preparedStatement.executeUpdate();
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
}
执行类 KafkaCount.java
package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple1<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> map(String value)
throws Exception {
String[] args = value.split(":");
return new Tuple1<String>(args[0]);
}
});

sourceStreamTra.addSink(new MySQLSink());
env.execute("data to mysql start");
}
}
package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple1<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> map(String value)
throws Exception {
String[] args = value.split(":");
return new Tuple1<String>(args[0]);
}
});

sourceStreamTra.addSink(new MySQLSink());
env.execute("data to mysql start");
}
}

kafka 输入测试

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test
结果查看

mysql> select * from sink0;
+----+------+
| id | key |
+----+------+
| 1 | 000 |
| 2 | a2 |
| 3 | a3 |
| 4 | b1 |
| 5 | b2 |
| 6 | b3 |
+----+------+

mysql> select * from sink0;
+----+------+
| id | key |
+----+------+
| 1 | 000 |
| 2 | a2 |
| 3 | a3 |
| 4 | b1 |
| 5 | b2 |
| 6 | b3 |
+----+------+

------

Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)

有时候我们需要根据记录的类别分别写到不同的文件中去,正如本博客的 《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)》《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)》以及《Spark多文件输出(MultipleOutputFormat)》等文章提到的类似。那么如何在Flink Streaming实现类似于《Spark多文件输出(MultipleOutputFormat)》文章中提到的功能呢?很遗憾,Flink内置并不提供相应的API接口来实现这种功能,我们需要自己实现多路文件输出。

在Flink Streaming的DataStream类中可以发现,DataStream的print函数、printToErr函数以及writeAsText函数都是封装了一个称为Sink的对象;而这些Sink都是实现了org.apache.flink.streaming.api.functions.sink.RichSinkFunction或者org.apache.flink.streaming.api.functions.sink.SinkFunction接口的,所以我们也可以自己实现上述两个接口从而达到文件的多路输出功能。

我们还发现,Flink中有一个org.apache.flink.api.java.io.TextOutputFormat类,此类通过调用FSDataOutputStream对象将记录写入到HDFS(当然也可以是其他Hadoop支持的文件系统),所以我们可以封装TextOutputFormat,然后根据record类别的不一样创建不一样的TextOutputFormat对象,从而实现文件的多路输出,根据上面的思路我实现了一个名为MultipleTextOutputFormatSinkFunction类,具体实现如下:

package com.iteblog

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful
import org.apache.flink.api.java.io.TextOutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.slf4j.LoggerFactory

import scala.collection.mutable

/**
* Created by https://www.iteblog.com on 2016/5/9.
*/
class MultipleTextOutputFormatSinkFunction[IN](descPath: String) extends RichSinkFunction[IN] {
val map = mutable.Map[String, TextOutputFormat[String]]()
var cleanupCalled = false
val LOG = LoggerFactory.getLogger(classOf[MultipleTextOutputFormatSinkFunction[_]])
var parameters: Configuration = null;

override def open(parameters: Configuration) {
this.parameters = parameters
}

override def invoke(item: IN): Unit = {
val tuple = item.asInstanceOf[(String, String)]
val key = tuple._1
val value = tuple._2
val result = map.get(key)
val format = if (result.isDefined) {
result.get
} else {
val textOutputFormat = new TextOutputFormat[String](new Path(descPath, key))
textOutputFormat.configure(parameters)
val context: RuntimeContext = getRuntimeContext
val indexInSubtaskGroup: Int = context.getIndexOfThisSubtask
val currentNumberOfSubtasks: Int = context.getNumberOfParallelSubtasks
textOutputFormat.open(indexInSubtaskGroup, currentNumberOfSubtasks)
map.put(key, textOutputFormat)
textOutputFormat
}
try {
format.writeRecord(value)
}
catch {
case ex: Exception => {
cleanup()
throw ex
}
}

}

override def close() {
try {
map.foreach(_._2.close())
} catch {
case ex: Exception => {
cleanup()
throw ex
}
} finally {
map.clear()
}
}

private def cleanup() {
try {
if (!cleanupCalled) {
cleanupCalled = true
map.foreach(item => item._2.asInstanceOf[CleanupWhenUnsuccessful].tryCleanupOnError())
}
}
catch {
case t: Throwable => {
LOG.error("Cleanup on error failed.", t)
}
}
}
}
MultipleTextOutputFormatSinkFunction类实现了org.apache.flink.streaming.api.functions.sink.RichSinkFunction接口,并实现了def invoke(item: IN): Unit方法;在里面我们根据记录的key值创建不同的TextOutputFormat,然后缓存到mutable.Map[String, TextOutputFormat[String]]中,以便下次可以直接根据key值获取。那么如何在Flink Streaming中使用呢?如下操作:

val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog", new SimpleStringSchema(), properties))
stream.map{
//做一些业务逻辑操作
}.addSink(new MultipleTextOutputFormatSinkFunction[(String, String)]("hdfs:///user/iteblog/outputs/"))

env.execute("FlinkKafkaStreaming")
运行这个Streaming程序,我们可以在hdfs:///user/iteblog/outputs/路径下看到产生了很多文件,如下:

[iteblog@www.iteblog.com ~]$ hadoop fs -ls -h /user/iteblog/outputs/
-rw-r--r-- 3 iteblog supergroup 1.7 M 2016-05-10 14:57 /user/iteblog/outputs/A
-rw-r--r-- 3 iteblog supergroup 2.5 M 2016-05-10 14:57 /user/iteblog/outputs/B
-rw-r--r-- 3 iteblog supergroup 1.9 M 2016-05-10 14:57 /user/iteblog/outputs/C
-rw-r--r-- 3 iteblog supergroup 3.1 M 2016-05-10 14:57 /user/iteblog/outputs/D
可以看到,已经根据记录的类型写入到不同的文件中了。

但是有几点需要注意:
1、这个MultipleTextOutputFormatSinkFunction只有当文件的大小达到了HDFS的块大小才能看到文件的大小,否则你看到的文件大小会一直为0,这是因为TextOutputFormat类就是这么实现的(我们应该可以对其进行扩展,当写到一定batch数量时,对文件进行刷新);有人可能会问,为什么你上面的文件不是块大小就显示了?那是因为我已经关掉了这个Flink Streaming程序,所以记录都写入到各个文件中了;

2、更好的做法应该是扩展org.apache.flink.api.common.io.FileOutputFormat类(TextOutputFormat类就是扩展这个类的),然后可以这么使用:

val mtof = new MultipleTextOutputFormat[(String,String)](new Path(bashPath))
stream.writeUsingOutputFormat(mtof)
上面的做法很类似于Hadoop中的MultipleTextOutputFormat,由于时间和精力有限,所以就不介绍如何在Flink中实现MultipleTextOutputFormat了;

3、如果你使用Flink Batch模式,实现一个MultipleTextOutputFormat应该很容易,因为我们可以直接使用Hadoop中的MultipleTextOutputFormat,具体如何使用我将在后面的文章中进行介绍;

4、如果你有更好的想法,欢迎分享。