注意
本文最后更新于 2023-11-13,文中内容可能已过时。
Akka Grpc 的简单入门
简介
Akka Grpc
是一个 rpc
框架。 基于akka-http
,google protobuf
构建而成。
一些快速的链接。
详细内容
本篇是一个中文的使用流程把。 使用Java
语言编写代码。
scala
语言和其他类型的编译工具的代码可以在官方文档中找到。
协议文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
|
简单的说明
message
定义数据交互格式 。
service
生成rpc 调用 使用的 api
格式为:
rpc 方法名 (参数) returns (返回类型) {}
参数和返回类型可以添加 stream
关键字, 表示这是一个数据流 。(持续输入, 或者持续输出)
生成java代码
我使用 Maven
工具作为依赖管理工具。
项目的 pom.xml
大致内容如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
<project>
<modelVersion>4.0.0</modelVersion>
<name>Project name</name>
<groupId>com.example</groupId>
<artifactId>my-grpc-app</artifactId>
<version>0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<akka.grpc.version>1.0.0-M1</akka.grpc.version>
<grpc.version>1.29.0</grpc.version>
<project.encoding>UTF-8</project.encoding>
</properties>
<dependencies>
<dependency>
<groupId>com.lightbend.akka.grpc</groupId>
<artifactId>akka-grpc-runtime_2.12</artifactId>
<version>${akka.grpc.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.lightbend.akka.grpc</groupId>
<artifactId>akka-grpc-maven-plugin</artifactId>
<version>${akka.grpc.version}</version>
<configuration>
<generatorSettings>
<serverPowerApis>true</serverPowerApis>
</generatorSettings>
</configuration>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
|
这里添加一个叫akka-grpc-maven-plugin
的 Maven插件。 这个插件的generate
行为可以生成出java 代码。
单独执行生成的话, 可以试试运行下面的命令生成代码。
mvn akka-grpc:generate
编写 server 端代码
在server
端 需要实现 GreeterService
接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package example.myapp.helloworld;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import akka.NotUsed;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import example.myapp.helloworld.grpc.*;
public class GreeterServiceImpl implements GreeterService {
private final Materializer mat;
public GreeterServiceImpl(Materializer mat) {
this.mat = mat;
}
@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in) {
System.out.println("sayHello to " + in.getName());
HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + in.getName()).build();
return CompletableFuture.completedFuture(reply);
}
@Override
public CompletionStage<HelloReply> itKeepsTalking(Source<HelloRequest, NotUsed> in) {
System.out.println("sayHello to in stream...");
return in.runWith(Sink.seq(), mat)
.thenApply(elements -> {
String elementsStr = elements.stream().map(elem -> elem.getName())
.collect(Collectors.toList()).toString();
return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build();
});
}
@Override
public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in) {
System.out.println("sayHello to " + in.getName() + " with stream of chars");
List<Character> characters = ("Hello, " + in.getName())
.chars().mapToObj(c -> (char) c).collect(Collectors.toList());
return Source.from(characters)
.map(character -> {
return HelloReply.newBuilder().setMessage(String.valueOf(character)).build();
});
}
@Override
public Source<HelloReply, NotUsed> streamHellos(Source<HelloRequest, NotUsed> in) {
System.out.println("sayHello to stream...");
return in.map(request -> HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build());
}
}
|
实现GreeterService
接口, 完成自己的需求即可。
启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package example.myapp.helloworld;
import akka.actor.ActorSystem;
import akka.http.javadsl.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import example.myapp.helloworld.grpc.*;
import java.util.concurrent.CompletionStage;
class GreeterServer {
public static void main(String[] args) throws Exception {
// important to enable HTTP/2 in ActorSystem's config
Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication());
// Akka ActorSystem Boot
ActorSystem sys = ActorSystem.create("HelloWorld", conf);
run(sys).thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
});
// ActorSystem threads will keep the app alive until `system.terminate()` is called
}
public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Exception {
Materializer mat = ActorMaterializer.create(sys);
// Instantiate implementation
GreeterService impl = new GreeterServiceImpl(mat);
return Http.get(sys).bindAndHandleAsync(
GreeterServiceHandlerFactory.create(impl, sys),
ConnectHttp.toHost("127.0.0.1", 8080),
mat);
}
}
|
需要在配置文件里面指定下面的参数
akka.http.server.preview.enable-http2 = on
编写 client 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package example.myapp.helloworld;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.time.Duration;
import io.grpc.StatusRuntimeException;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.grpc.GrpcClientSettings;
import example.myapp.helloworld.grpc.*;
class GreeterClient {
public static void main(String[] args) throws Exception {
String serverHost = "127.0.0.1";
int serverPort = 8080;
ActorSystem system = ActorSystem.create("HelloWorldClient");
Materializer materializer = ActorMaterializer.create(system);
// Configure the client by code:
GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8080, system);
// Or via application.conf:
// GrpcClientSettings settings = GrpcClientSettings.fromConfig(GreeterService.name, system);
GreeterServiceClient client = null;
try {
client = GreeterServiceClient.create(settings, materializer, system.dispatcher());
singleRequestReply(client);
streamingRequest(client);
streamingReply(client, materializer);
streamingRequestReply(client, materializer);
} catch (StatusRuntimeException e) {
System.out.println("Status: " + e.getStatus());
} catch (Exception e) {
System.out.println(e);
} finally {
if (client != null) client.close();
system.terminate();
}
}
private static void singleRequestReply(GreeterService client) throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Alice").build();
CompletionStage<HelloReply> reply = client.sayHello(request);
System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
private static void streamingRequest(GreeterService client) throws Exception {
List<HelloRequest> requests = Arrays.asList("Alice", "Bob", "Peter")
.stream().map(name -> HelloRequest.newBuilder().setName(name).build())
.collect(Collectors.toList());
CompletionStage<HelloReply> reply = client.itKeepsTalking(Source.from(requests));
System.out.println("got single reply for streaming requests: " +
reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
private static void streamingReply(GreeterService client, Materializer mat) throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Alice").build();
Source<HelloReply, NotUsed> responseStream = client.itKeepsReplying(request);
CompletionStage<Done> done =
responseStream.runForeach(reply ->
System.out.println("got streaming reply: " + reply.getMessage()), mat);
done.toCompletableFuture().get(60, TimeUnit.SECONDS);
}
private static void streamingRequestReply(GreeterService client, Materializer mat) throws Exception {
Duration interval = Duration.ofSeconds(1);
Source<HelloRequest, NotUsed> requestStream = Source
.tick(interval, interval, "tick")
.zipWithIndex()
.map(pair -> pair.second())
.map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build())
.take(10)
.mapMaterializedValue(m -> NotUsed.getInstance());
Source<HelloReply, NotUsed> responseStream = client.streamHellos(requestStream);
CompletionStage<Done> done =
responseStream.runForeach(reply ->
System.out.println("got streaming reply: " + reply.getMessage()), mat);
done.toCompletableFuture().get(60, TimeUnit.SECONDS);
}
}
|
主要是构造一个GreeterServiceClient
对象, 用于进行 rpc
调用即可。
基本上使用 就是上面的内容。 官方文档还提供一种添加 Metadata
参数的方法。 有兴趣的可以看看官方文档,作者回复issue
的频率很高, 有问题可以去问问。
生产者/消费者模式
我的经验告诉我, 这个rpc库不能像常规的socket那样互发消息, 比如服务端要推送消息给客户端的时候就不太好用。
如果服务端不能推送消息给客户端的话, 那么就只能让客户端不停的轮询服务器,这里是比较浪费流量和性能的。 🐤🐤
我考虑的解决方法是 生产者/消费者模式 😺
Source
类可以通过一个叫fromPublisher 的方法把一个Publisher
转成Source
。
Usage
给一段代码 应该就可以明白怎么使用了。
1
2
3
4
|
Publisher publisher = new SubmissionPublisher<HelloReply>();
// publisher.offer(xx) 即可发送消息
return JavaFlowSupport.Source.fromPublisher(publisher);
|
Source.fromPublisher()
方法需要使用akka-stream
库,有兴趣的朋友可以试着研究下。
FAQ
akka 2.6.0
生成代码后报错。
使用更高的版本, 比如 akka 2.6.4
即可以解决问题。
如果非要使用 2.6.0 的话, 需要自己手动修改生成出来的代码, 不推荐这种方式。
如果我完全不懂akka
我可以使用akka-grpc
吗
可以是可以。。 但是难度不小, 这个库是基于 akka
,akka-stream
,akka-http
的。 其中异步响应式流框架akka-stream
有很多API, 在akka-grpc
里面应该都是可用的。
所以想好好使用这个框架的话, 可能也需要花时间去学习另外一个框架。