- 异步算子源码分析
- 异步算子为啥能够保证有序性
- flinksql中怎么自定义使用异步lookup join
import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 网上copy的模拟一个耗时的异步操作 */ public class AsyncIODemo implements Serializable { private final ExecutorService executorService = Executors.newFixedThreadPool(4); public CompletableFuture<String> pullData(final String source) { CompletableFuture<String> completableFuture = new CompletableFuture<>(); executorService.submit(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } /** * 前面睡眠几秒后,调用一下完成方法,拼接一个结果字符串 */ completableFuture.complete("Output value: " + source); }); return completableFuture; } }
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** * 网上copy的代码 */ public class AsyncTest { public static void main(String[] args) throws Exception { /** * 获取Flink执行环境并设置并行度为1,方便后面观测 */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); /** * 构造一个DataStreamSource的序列 */ DataStreamSource stream = env.fromElements("11", "22", "33", "44"); /** * 使用AsyncDataStream构造一个异步顺序流,这里异步顺序流从名字就可以看出来虽然是异步的,但是却可以保持顺序, * 这个后面源码分析可以知道原因 */ SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() { @Override public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception { /** * 这里调用模拟的获取异步请求结果,并返回一个CompletableFuture */ CompletableFuture<String> future = new AsyncIODemo().pullData(input); /** * 注册一个future处理完成的回调,当future处理完成拿到结果后,调用resultFuture的 * complete方法真正吐出数据 */ future.whenCompleteAsync((d,t) ->{ resultFuture.complete(Arrays.asList(d)); }); } // 设置最长异步调用超时时间为10秒 }, 10, TimeUnit.SECONDS); asyncStream.print(); env.execute(); } }
package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; import java.util.concurrent.TimeUnit; /** * 用于将AsyncFunction应用到数据流的一个helper类 * * <pre>{@code * DataStream<String> input = ... * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... * * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100); * }</pre> */ @PublicEvolving public class AsyncDataStream { /** 异步操作的输出模式,有序或者无序. */ public enum OutputMode { ORDERED, UNORDERED } private static final int DEFAULT_QUEUE_CAPACITY = 100; /** * flag_2,添加一个AsyncWaitOperator. * * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added. * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}. * @param timeout for the asynchronous operation to complete * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside. * @param mode Processing mode for {@link AsyncWaitOperator}. * @param <IN> Input type. * @param <OUT> Output type. * @return A new {@link SingleOutputStreamOperator} */ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, int bufSize, OutputMode mode) { TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( func, AsyncFunction.class, 0, 1, new int[] {1, 0}, in.getType(), Utils.getCallLocationName(), true); /** 这里生成了一个AsyncWaitOperatorFactory */ AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<>( in.getExecutionEnvironment().clean(func), timeout, bufSize, mode); return in.transform("async wait operator", outTypeInfo, operatorFactory); } /** * 添加一个AsyncWaitOperator。输出流无顺序。 * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} * @param timeout for the asynchronous operation to complete * @param timeUnit of the given timeout * @param capacity The max number of async i/o operation that can be triggered * @param <IN> Type of input record * @param <OUT> Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED); } /** * 添加一个AsyncWaitOperator。输出流无顺序。 * @param in Input {@link DataStream} * @param func {@link AsyncFunction} * @param timeout for the asynchronous operation to complete * @param timeUnit of the given timeout * @param <IN> Type of input record * @param <OUT> Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) { return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED); } /** * flag_1,添加一个AsyncWaitOperator。处理输入记录的顺序保证与输入记录的顺序相同 * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} * @param timeout for the asynchronous operation to complete * @param timeUnit of the given timeout * @param capacity The max number of async i/o operation that can be triggered * @param <IN> Type of input record * @param <OUT> Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED); } /** * 添加一个AsyncWaitOperator。处理输入记录的顺序保证与输入记录的顺序相同 * @param in Input {@link DataStream} * @param func {@link AsyncFunction} * @param timeout for the asynchronous operation to complete * @param timeUnit of the given timeout * @param <IN> Type of input record * @param <OUT> Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) { return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED); } }
如上从测试代码开始调用链为:AsyncDataStream.orderedWait -> addOperator,然后addOperator中new了一个AsyncWaitOperatorFactory。然后到这里其实可以告一段落了,因为没有必要往下看了,这个时候就需要猜了,一般我们类名叫XXFactory基本都是工厂模式,然后工厂生产的就是XX了,这里就是生成AsyncWaitOperator对象的工厂了,然后我们就可以直接在AsyncWaitOperator类的构造方法第一行打个断点,看看啥时候会进去了。为啥要这样做,因为我们看到的Flink源码其实并不是一个线性的执行过程,架构图如下
public AsyncWaitOperator( @Nonnull AsyncFunction<IN, OUT> asyncFunction, long timeout, int capacity, @Nonnull AsyncDataStream.OutputMode outputMode, @Nonnull ProcessingTimeService processingTimeService, @Nonnull MailboxExecutor mailboxExecutor) { super(asyncFunction); setChainingStrategy(ChainingStrategy.ALWAYS); Preconditions.checkArgument( capacity > 0, "The number of concurrent async operation should be greater than 0."); this.capacity = capacity; this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode"); this.timeout = timeout; this.processingTimeService = Preconditions.checkNotNull(processingTimeService); this.mailboxExecutor = mailboxExecutor; }
@Override public void setup( StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); this.inStreamElementSerializer = new StreamElementSerializer<>( getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader())); switch (outputMode) { case ORDERED: queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } this.timestampedCollector = new TimestampedCollector<>(super.output); }
一眼望去就发现下面switch case那里比较有用,根据名字可以知道,这里根据outputMode判断分别实例化有序的队列和无需的队列,联想到AsyncDataStream类里的几个orderedWait和unorderedWait方法,很快就能想到是否有序这个队列就是关键了。好了没什么可以留恋了,继续执行到下一个断点吧!
@Override public void open() throws Exception { super.open(); this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled(); if (recoveredStreamElements != null) { for (StreamElement element : recoveredStreamElements.get()) { if (element.isRecord()) { processElement(element.<IN>asRecord()); } else if (element.isWatermark()) { processWatermark(element.asWatermark()); } else if (element.isLatencyMarker()) { processLatencyMarker(element.asLatencyMarker()); } else { throw new IllegalStateException( "Unknown record type " + element.getClass() + " encountered while opening the operator."); } } recoveredStreamElements = null; } }
@Override public void processElement(StreamRecord<IN> record) throws Exception { StreamRecord<IN> element; // copy the element avoid the element is reused if (isObjectReuseEnabled) { //noinspection unchecked element = (StreamRecord<IN>) inStreamElementSerializer.copy(record); } else { element = record; } // add element first to the queue final ResultFuture<OUT> entry = addToWorkQueue(element); final ResultHandler resultHandler = new ResultHandler(element, entry); // register a timeout for the entry if timeout is configured if (timeout > 0L) { resultHandler.registerTimeout(getProcessingTimeService(), timeout); } userFunction.asyncInvoke(element.getValue(), resultHandler); }
/** 将给定的流元素添加到操作符的流元素队列中。该操作会阻塞,直到元素被添加。 */ private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException { Optional<ResultFuture<OUT>> queueEntry; while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { mailboxExecutor.yield(); } return queueEntry.get(); }
@PublicEvolving public interface ResultFuture<OUT> { /** * 将所有结果放在Collection中,然后输出。 */ void complete(Collection<OUT> result); /** * 将异常输出 */ void completeExceptionally(Throwable error); }
@Internal public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Queue for the inserted StreamElementQueueEntries. */ private final Queue<StreamElementQueueEntry<OUT>> queue; public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } @Override public boolean hasCompletedElements() { return !queue.isEmpty() && queue.peek().isDone(); } @Override public void emitCompletedElement(TimestampedCollector<OUT> output) { if (hasCompletedElements()) { final StreamElementQueueEntry<OUT> head = queue.poll(); head.emitResult(output); } } @Override public List<StreamElement> values() { List<StreamElement> list = new ArrayList<>(this.queue.size()); for (StreamElementQueueEntry e : queue) { list.add(e.getInputElement()); } return list; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) { if (queue.size() < capacity) { StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement); queue.add(queueEntry); LOG.debug( "Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity); return Optional.of(queueEntry); } else { LOG.debug( "Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity); return Optional.empty(); } } private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) { if (streamElement.isRecord()) { return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement); } if (streamElement.isWatermark()) { return new WatermarkQueueEntry<>((Watermark) streamElement); } throw new UnsupportedOperationException("Cannot enqueue " + streamElement); } }
@Internal class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> { @Nonnull private final StreamRecord<?> inputRecord; private Collection<OUT> completedElements; StreamRecordQueueEntry(StreamRecord<?> inputRecord) { this.inputRecord = Preconditions.checkNotNull(inputRecord); } @Override public boolean isDone() { return completedElements != null; } @Nonnull @Override public StreamRecord<?> getInputElement() { return inputRecord; } @Override public void emitResult(TimestampedCollector<OUT> output) { output.setTimestamp(inputRecord); for (OUT r : completedElements) { output.collect(r); } } @Override public void complete(Collection<OUT> result) { this.completedElements = Preconditions.checkNotNull(result); } }
// 首先将元素添加到队列中 final ResultFuture<OUT> entry = addToWorkQueue(element); final ResultHandler resultHandler = new ResultHandler(element, entry); // 如果配置了timeout,则为条目注册一个超时,这里的timeout也就是测试代码里的10s if (timeout > 0L) { resultHandler.registerTimeout(getProcessingTimeService(), timeout); } userFunction.asyncInvoke(element.getValue(), resultHandler);
/** * 使用AsyncDataStream构造一个异步顺序流,这里异步顺序流从名字就可以看出来虽然是异步的,但是却可以保持顺序, * 这个后面源码分析可以知道原因 */ SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() { @Override public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception { /** * 这里调用模拟的获取异步请求结果,并返回一个CompletableFuture */ CompletableFuture<String> future = new AsyncIODemo().pullData(input); /** * 注册一个future处理完成的回调,当future处理完成拿到结果后,调用resultFuture的 * complete方法真正吐出数据 */ future.whenCompleteAsync((d,t) ->{ resultFuture.complete(Arrays.asList(d)); }); } // 设置最长异步调用超时时间为10秒 }, 10, TimeUnit.SECONDS);
private void outputCompletedElement() { /** 判断这个OrderedStreamElementQueue队列有没有完成了的元素,参见上面代码 @Override public boolean hasCompletedElements() { return !queue.isEmpty() && queue.peek().isDone(); } 其实就是查看了一下队列头的元素StreamRecordQueueEntry,调用了一下isDone方法 @Override public boolean isDone() { return completedElements != null; } 就是判断成员变量是不是空,因为上一步已经赋值了,所以这里isDone就返回true了 */ if (queue.hasCompletedElements()) { /** 调用了一下OrderedStreamElementQueue队列的emitCompletedElement方法, @Override public void emitCompletedElement(TimestampedCollector<OUT> output) { if (hasCompletedElements()) { final StreamElementQueueEntry<OUT> head = queue.poll(); head.emitResult(output); } } 移除队列的头元素StreamElementQueueEntry,并调用其emitResult方法 @Override public void emitResult(TimestampedCollector<OUT> output) { output.setTimestamp(inputRecord); for (OUT r : completedElements) { output.collect(r); } } 这里就是真正的循环调用collect把数据吐出到下游去了 */ queue.emitCompletedElement(timestampedCollector); // if there are more completed elements, emit them with subsequent mails if (queue.hasCompletedElements()) { try { mailboxExecutor.execute( this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement"); } catch (RejectedExecutionException mailboxClosedException) { // This exception can only happen if the operator is cancelled which means all // pending records can be safely ignored since they will be processed one more // time after recovery. LOG.debug( "Attempt to complete element is ignored since the mailbox rejected the execution.", mailboxClosedException); } } } } /** A handler for the results of a specific input record. */ private class ResultHandler implements ResultFuture<OUT> { /** Optional timeout timer used to signal the timeout to the AsyncFunction. */ private ScheduledFuture<?> timeoutTimer; /** Record for which this result handler exists. Used only to report errors. */ private final StreamRecord<IN> inputRecord; /** * The handle received from the queue to update the entry. Should only be used to inject the * result; exceptions are handled here. */ private final ResultFuture<OUT> resultFuture; /** * A guard against ill-written AsyncFunction. Additional (parallel) invokations of {@link * #complete(Collection)} or {@link #completeExceptionally(Throwable)} will be ignored. This * guard also helps for cases where proper results and timeouts happen at the same time. */ private final AtomicBoolean completed = new AtomicBoolean(false); ResultHandler(StreamRecord<IN> inputRecord, ResultFuture<OUT> resultFuture) { this.inputRecord = inputRecord; this.resultFuture = resultFuture; } @Override public void complete(Collection<OUT> results) { Preconditions.checkNotNull( results, "Results must not be null, use empty collection to emit nothing"); // cas修改一下completed的状态,不成功就返回 if (!completed.compareAndSet(false, true)) { return; } processInMailbox(results); } private void processInMailbox(Collection<OUT> results) { // move further processing into the mailbox thread mailboxExecutor.execute( () -> processResults(results), "Result in AsyncWaitOperator of input %s", results); } private void processResults(Collection<OUT> results) { /** 如果超时的Timer对象不为空,则将定时任务取消掉,因为这里已经是在完成方法里调用了, 数据都完成处理了,这个数据的超时任务就可以取消了 */ if (timeoutTimer != null) { // canceling in mailbox thread avoids // https://issues.apache.org/jira/browse/FLINK-13635 timeoutTimer.cancel(true); } /** 这里调用了一下StreamRecordQueueEntry的complete方法将成员变量completedElements 赋值了,可以参见上面StreamRecordQueueEntry类 */ resultFuture.complete(results); // 这里看上面第1行代码 outputCompletedElement(); } }
- 取消当前ResultHandler对象的超时定时任务
- 调用StreamRecordQueueEntry的complete方法将成员变量completedElements赋值
- 判断OrderedStreamElementQueue队列的队头元素StreamRecordQueueEntry的completedElements成员变量是不是不为空
- 第3步不为空,则调用OrderedStreamElementQueue队列的emitCompletedElement方法移除队列的头元素StreamElementQueueEntry并调用emitResult方法真正向下游吐出数据
通常flinksql使用外部的数据源都需要引入一个flinksql-connector-xx这种jar包,比如我们想以kafka为流表join一个redis的维表,那么这时候查询redis的维表,通常使用的就是lookup join。但是网上提供的例子基本都是同步的lookup join,在有些场景下为了提高吞吐就需要使用异步的lookup join。详细实现可以直接看代码:https://gitee.com/rongdi/flinksql-connector-redis