Flutter和Dart的异步编程

Flutter和Dart的异步编程

参考:

使用Flutter和Dart开发APP或者应用时,通常需要访问DB、API、或其他资源,都需要异步处理进行性能提升。

作为Flutter开发组件库,提供了两个组件直接支持异步的UI构建处理(https://flutter.io/widgets/async/)。

  • FutureBuilder: 基于Future的最新数据来构建Widget。
  • StreamBuilder: 基于数据流Stream的数据来构建Widget(可以支持多个数据持续更新构建,BLOC的架构就是依次模式构建)。

Dart Asynchronous 基础

Future表示一个数据还没有存在:

1
2
3
4
5
6
abstract class Future<T> {
Future<R> then<R>(FutureOr<R> onValue(T value), {Function onError});
Future<T> catchError(Function onError, {bool test(Object error)});
Future<T> whenComplete(FutureOr action());
...
}

Stream标识一个异步的迭代对象,有顺序数据、可以操作和过滤数据;迭代是拉(Pull)模式访问数据,Stream是推(Push)模式访问数据。

1
2
3
4
5
6
7
8
abstract class Stream<E>{
listen(void onData(E data), ...);

Stream map(Function f);
Stream<E> where(Function f);
Future<E> get first;
void forEach(void f(E element));
}

Stream & Future例子:

1
2
3
4
5
6
7
8
9
runServer(){
var future = HttpServer.bind('127.0.0.1', 4040);
future.then((HttpServer server){ // a Stream
server.listen((HttpRequest request){
request.reponse.write('Hello, world!');
request.response.close();
});
});
}

问题是使用了太多的Callback级联,通过使用await语法糖可以简化,清晰逻辑。

过渡方案,使用for循环优化例子:

1
2
3
4
5
6
7
runServer(){
var server = HttpServer.bind('127.0.0.1', 4040);
for (HttpRequest request in server.requests) {
request.reponse.write('Hello, world!');
request.response.close();
});
}

await优化的例子:

1
2
3
4
5
6
7
runServer() async{
var server = await HttpServer.bind('127.0.0.1', 4040);
server.listen((HttpRequest request){
request.reponse.write('Hello, world!');
request.response.close();
});
}

await for继续优化:

1
2
3
4
5
6
7
runServer() async{
var server = await HttpServer.bind('127.0.0.1', 4040);
await for (HttpRequest request in server.requests) {
request.reponse.write('Hello, world!');
request.response.close();
});
}

关于Await的用法,对于Future进行等待期执行结果,在代码块(方法)内,await把Future的异步处理变成了同步调用关系(Process);同时await返回Future的泛型结果数据。

1
2
3
4
API:
Future<List<String>> File.readAsLines();
Async use:
List<String> lines = await file.readAsLines();

await也可以使用在非Future对象上的任意变量上,等价于Future的value包装。

1
2
3
await 998;
==
await new Future.value(988);

闭包的使用:

1
2
foo(() async => ...);
foo(() async {...});

Bodywrap与microtask:

1
2
3
Future runServer() async {...}
==
Future runServer() => new Future.miscrotask((){...});

Bodywrap的运行结果和影响(Consequences):

  • 在代码块中的错误会被自行捕获,并返回到Future中;
  • Functions yield at entry (???)
  • 返回的Futures可以被链条处理(chained);
  • 返回结果是Future类型

异步返回多个数据,需要使用Stream,一个数据的时候使用Future。同步的时候使用Iterable返回多个数据。

同步多个数据迭代处理(Iterable & sync*):

1
2
3
4
5
6
Iterable range(int from, int to) sync* {
for (init i = from; i < to; i++) {
yield i;
}
}
range(3, 6).forEach(print); //Prints 3 4 5

yield推送一个数据,yeild*在另外一个迭代之间建立管道,推送数据到上层迭代。

1
2
3
4
5
6
Iterable recRange(int from, int to){
if (from < to){
yield from;
yield* recRange(from + 1, to);
}
}

异步返回多个数据到Streams中,使用async*:

1
2
3
4
5
6
Stream bigFiles(List<String> fileNames) async* {
for (var fileName in fileNames) {
var stat = await new File(fileName).stat();
if (stat.size > 100000) yield fileName;
}
}

Async*的内部细节:

  • 函数直到listen方法调用的时候才会开始执行;
  • cancel方法可以终结async函数;
  • Yield通过event loop返回数据。
1
while (true) { yield 988 }

异步编程的问题:

  • Debug调试困难
    • 处理交错执行
    • 堆栈帧丢失
  • 当Debug时,stacktrace有太多的Noice,解决办法,使用stack_trace包简化调用处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
import 'package:stack_trace/stack_trace.dart';

foo() async{
await 'asynchronous wait';
throw 'error';
}
bar() => foo();
gee() => bar();
main(){
Chain.capture(gee, onError: (e, chain){
print(chain.terse);
});
}

Dart Asynchronous Processing

Dart语言支持异步处理流程,通常的用法有如下7种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import 'dart:async';

main() async{
// ① 使用Future和Then级联同步调用
asyncFunc1().then((_) => asyncFunc2()).then((result) {
print(result);
});

// ② 使用await同步调用两个异步方法
var asyncResult1 = await asyncFunc1();
var asyncResult2 = await asyncFunc2();
print(asyncResult2);

// ③ 使用await Future.wait同步等待两个异步调用并发执行结束
var list = await Future.wait([asyncFunc1(), asyncFunc2()]);
print(list[1]);

// ④ 把Future转换成Stream模式调用
asyncFunc1().asStream().asyncMap((_) => asyncFunc2()).listen((result) {
print(result);
});

// ⑤ 把Future转换成Stream,然后使用await for循环Stream的结果同步处理
var stream = asyncFunc1().asStream().asyncMap((_) => asyncFunc2());
await for (var result in stream) {
print(result);
}

// ⑥ 把Future转换成Stram模式调用
asyncFunc1()
.asStream()
.asyncExpand((_) => asyncFunc2().asStream())
.listen((result) {
print(result);
});

// ⑦ 把Future转换成Stream并用yield方式级联
asyncFunc1().asStream().asyncExpand((_) async* {
yield await asyncFunc2();
}).listen((result) {
print(result);
});
}

Future<String> asyncFunc1() async {
return "async return1";
}

Future<String> asyncFunc2() async {
return "async return2";
}

关于什么时候使用Future,什么时候使用Stream。

对于多个数据的异步处理流程,只能使用Stream,对于一般异步处理使用Future和Then更加有利于代码逻辑组织。

1
2
3
4
5
6
7
8
9
10
11
12
Future<String> asyncFunc() async {
return "async return";
}
var asyncResult = await asyncFunc();
print(asyncResult);
// vs
Stream<String> streamFunc() async* {
yield "stream return";
}
streamFunc().listen((result) {
print(result);
});

This allows you to choose whether to use FutureBuilder or StreamBuilder depending on your situation.

“await for” 和 “listen()”

在大多数场景,使用 wait for会应该更清晰表达逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import 'dart:async';
import 'dart:io';
import 'dart:math';

main() async {
await for (var tick in endlessCountDown()) {
print("coutDown $tick");
}
// endlessCountDown().listen((tick) {
// print("coutDown $tick");
// });
}

Stream<int> endlessCountDown() async* {
var i = pow(2, 30) - 1;
while (true) {
sleep(new Duration(seconds: 1));
yield i--;
}
}

但是使用await和listen还是有区别,一个是同步流程,一个是异步流程。例如下面这个例子,使用await,后一个countUp永远也不会被执行到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import 'dart:async';
import 'dart:io';
import 'dart:math';

main() async {
await for (var tick in endlessCountDown()) {
print("coutDown $tick");
}
await for (var line in endlessCountUp()) {
print("countUp $line");
}
}

Stream<int> endlessCountUp() async* {
var i = 0;
while (true) {
sleep(new Duration(seconds: 1));
yield i++;
}
}

Stream<int> endlessCountDown() async* {
var i = pow(2, 30) - 1;
while (true) {
sleep(new Duration(seconds: 1));
yield i--;
}
}

当然,单独的处理封装成异步方法,再调用,就可以把await的操作又异步化组合一起了,但是有这个必要吗?这个时候也许使用listen更好一下吧,具体看情况而定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import 'dart:async';
import 'dart:io';
import 'dart:math';

main() {
listenCountDown();
listenCountUp();
}

listenCountDown() async {
await for (var tick in endlessCountDown()) {
print("coutDown $tick");
}
}

listenCountUp() async {
await for (var tick in endlessCountUp()) {
print("coutUp $tick");
}
}
...

这是用listen的例子,是不是在这种状况下会更好一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
import 'dart:async';
import 'dart:io';
import 'dart:math';

main() {
endlessCountDown().listen((tick) {
print("coutDown $tick");
});
endlessCountUp().listen((tick) {
print("coutUp $tick");
});
}
...

注意 在Stream上使用await for的时候,一定要清晰认识到你是否需要等待所有的Stream数据都到达处理完后再执行后续操作。这个在很多场合都是不适用的,例如侦听Dom的事件,如果对Dom两个节点进行侦听事件的处理,就不能适用await for,而应该使用listen异步侦听模式。

简单的说,await for是同步拆解出所有的stream数据并处理,在特定场景需要这样做,另外一些场景缺不能这样做。

“await” 、 “then()” 和 “await Future.wait()”

1
2
3
4
5
6
7
8
9
10
11
12
13
// ① 使用Future和Then级联同步调用
asyncFunc1().then((_) => asyncFunc2()).then((result) {
print(result);
});

// ② 使用await同步调用两个异步方法
var asyncResult1 = await asyncFunc1();
var asyncResult2 = await asyncFunc2();
print(asyncResult2);

// ③ 使用await Future.wait同步等待两个异步调用并发执行结束
var list = await Future.wait([asyncFunc1(), asyncFunc2()]);
print(list[1]);

同上,await是同步等待数据并执行,then是异步侦听模式;而await Future.wait()是多个异步并发执行,并等待所有异步操作执行返回。

下面使用Stopwatch来比较各个处理流程需要的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ②
Stopwatch stopwatch1 = new Stopwatch()..start();
var asyncResult1 = await asyncFunc1();
var asyncResult2 = await asyncFunc2();
print(asyncResult2);
print('await() executed in ${stopwatch1.elapsed}');

// ③
Stopwatch stopwatch2 = new Stopwatch()..start();
var list = await Future.wait([asyncFunc1(), asyncFunc2()]);
print(list[1]);
print('Future.wait() executed in ${stopwatch2.elapsed}');

Future<String> asyncFunc1() async {
return new Future.delayed(const Duration(seconds: 1), () => "async return1");
}

Future<String> asyncFunc2() async {
return new Future.delayed(const Duration(seconds: 1), () => "async return2");
}

根据以上的例子,②用的时间为2秒多些,③用的时间为1秒多些,③更好的并行使用了线程的时间。

因此如果业务场景适合,使用Future.wait会有更好的性能输出。

最后,一个例子展示如何使用FutureBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import 'dart:async';
import 'dart:convert';

import 'package:flutter/material.dart';
import 'package:github_flutter/access_token.dart';
import 'package:github_flutter/model/event.dart';
import 'package:github_flutter/model/user.dart';
import 'package:http/http.dart' as http;

void main() => runApp(new MyApp());

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return new MaterialApp(
title: 'Flutter Demo',
theme: new ThemeData(
primarySwatch: Colors.deepPurple,
),
home: new FutureBuilder(future: new Future(() async {
var token = await AccessToken.getInstance().getOrCreate();
var user = await fetchUser(token);
return await fetchEvents(user, token);
}), builder: (BuildContext context, AsyncSnapshot<EventList> feedState) {
if (feedState.error != null) {
// TODO: error handling
}
if (feedState.data == null) {
return new Center(child: new CircularProgressIndicator());
}
return new MyHomePage(title: 'GitHub Events', events: feedState.data);
}),
);
}

Future<User> fetchUser(String token) async {
var userResponse =
await http.get('https://api.github.com/user?access_token=' + token);
return new User.fromJson(json.decode(userResponse.body));
}

Future<EventList> fetchEvents(User user, String token) async {
var response = await http.get('https://api.github.com/users/${user
.login}/events?access_token=' +
token);
print(response.body);
final responseJson = json.decode(response.body);
return new EventList.fromJson(responseJson);
}
}

class MyHomePage extends StatefulWidget {
MyHomePage({Key key, this.title, this.events}) : super(key: key);

final String title;
final EventList events;

@override
_MyHomePageState createState() => new _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
void _incrementCounter() {
setState(() {});
}

@override
Widget build(BuildContext context) {
var notifiationList = widget.events.events;
return new Scaffold(
appBar: new AppBar(
title: new Text(widget.title),
),
body: new ListView.builder(
itemCount: notifiationList.length,
itemBuilder: (context, index) {
return new ListTile(
onTap: _launchURL(notifiationList[index]
.url
.replaceFirst("api.github.com/repos", "github.com")),
title: new Text('${notifiationList[index]
.repoFullName} : ${notifiationList[index].type}'),
);
}),
floatingActionButton: new FloatingActionButton(
onPressed: _incrementCounter,
tooltip: 'Increment',
child: new Icon(Icons.add),
));
}

_launchURL(String url) {
return () async {
if (await canLaunch(url)) {
await launch(url);
} else {
throw 'Could not launch $url';
}
};
}
}