Apollo Graphql and Kotlin: create a back-to-back subscription system

What is Apollo ?

Why back to back?

Architecture

Server node

Schéma Graphql

const typeDefs = `
type Subscription {
id_user: ID!
data: [Message]!
}
type Message {
title: String!
message: String!
}
input MessageInput {
title: String!
message: String!
}
type Query {
subscriptions: [Subscription]
}
type Mutation {
addSubscription(id_user: ID!, data: [MessageInput]!): Subscription
updateSubscription(id_user: ID!, data: [MessageInput]!): Subscription
deleteSubscription(id_user: ID!): Subscription
}
type Subscription {
SubscriptionUpdated: Subscription
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
`;
module.exports = {
typeDefs
};

Resolvers

const {PubSub} = require('graphql-yoga');
const pubsub = new PubSub();
const resolvers = {
Query: {
subscriptions: () => {
return service.subscriptions()
},
},
Mutation: {
addSubscription: (root, args) => {
let data = {
data: args.data
};
service.creatSubscription( args.id_user, data );
data.id_user = args.id_user;
return data;
},
updateSubscription: (root, args) => {
let data = {
data: args.data
};
service.updateSubscription( args.id_user, data );
data.id_user = args.id_user;
pubsub.publish('SubscriptionUpdated', {SubscriptionUpdated: data});
console.log('Data updated', data);
return data;
},
deleteSubscription: (root, args) => {
service.deleteSubscription( args.id_user );
console.log('Data deleted', args.id_user);
return {
id_user: args.id_user
};
}
},
Subscription: {
SubscriptionUpdated: {
subscribe: () => pubsub.asyncIterator('SubscriptionUpdated'),
}
}
};
module.exports = {
resolvers
};

Index Schema

const { typeDefs } = require('./typeDefs');
const { resolvers } = require('./resolvers');
const { makeExecutableSchema } = require('graphql-tools');
const schema = makeExecutableSchema({
typeDefs: typeDefs,
resolvers: resolvers,
});
module.exports = {
schema
};

Index serveur Node.js

const express = require('express');
const bodyParser = require("body-parser");
const service = require('./service');
const { GraphQLServer, PubSub } = require('graphql-yoga');
const schema = require('./index');
const pubsub = new PubSub();
const app = new GraphQLServer(schema);
global.service = service;const options = {
port: process.env.PORT || 3000,
endpoint: '/graphql',
subscriptions: '/subscriptions',
playground: '/playground'
};
app.use(bodyParser.json());app.start(options, ({ port }) =>
console.log('App listening on port ' + options.port + '!')
);

Client Kotlin Server

Project Android

apollo-codegen download-schema http://localhost:8080/graphql --output schema.json
query Subscriptions {
subscriptions {
id_user
data{code, ,title, message}
}
}
mutation Subscription ($id_user: ID!, $data: [MessageInput]!) {
addSubscription (
id_user: $id_user,
data: $data,
) {
id_user,
data{code, ,title, message}
}}
subscription SubscriptionUpdated {
SubscriptionUpdated {
id_user
data{code, ,title, message}
}
}
dependencies {
classpath 'com.android.tools.build:gradle:3.0.1'
classpath 'com.apollographql.apollo:apollo-gradle-plugin:0.4.4'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
apply plugin: 'com.android.application'apply plugin: 'com.apollographql.android'apply plugin: 'kotlin-android'apply plugin: 'kotlin-android-extensions'

Generated code

Apollo Instance and WebSocket

import com.apollographql.apollo.ApolloCall
import com.apollographql.apollo.ApolloClient
import com.apollographql.apollo.ApolloSubscriptionCall
import com.apollographql.apollo.rx2.Rx2Apollo
import com.apollographql.apollo.subscription.WebSocketSubscriptionTransport
import fr.client.apollo.mutations.SubscriptionMutation
import fr.client.apollo.queries.SubscriptionsQuery
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import fr.apollo.type.MessageInput
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
import java.util.concurrent.TimeUnit
object ApolloClient { private val BASE_URL_GRAPHQL = "http://localhost:3000/graphql"
private val BASE_URL_SUBSCRIPTIONS = "ws://localhost:3000/subscriptions"
private val apolloClient: ApolloClient
private val subscriptionQueryClient: SubscriptionsQuery
private val subscriptionSubscriptionClient: SubscriptionUpdatedSubscription
init {
val logging = HttpLoggingInterceptor()
logging.setLevel(HttpLoggingInterceptor.Level.BODY)
val okHttpClient = OkHttpClient.Builder()
.addInterceptor(logging)
.pingInterval(30, TimeUnit.SECONDS)
.build()
apolloClient = ApolloClient.builder()
.serverUrl(BASE_URL_GRAPHQL)
.okHttpClient(okHttpClient)
.subscriptionTransportFactory(WebSocketSubscriptionTransport.Factory(BASE_URL_SUBSCRIPTIONS, okHttpClient))
.build()
subscriptionQueryClient = SubscriptionsQuery.builder().build()
subscriptionSubscriptionClient = SubscriptionUpdatedSubscription.builder().build()
val observer = Rx2Apollo.from(getSubscriptionSubscriptionCall())
observer.subscribeWith(SubscriptionSubscriber ())
}
fun getApolloClient(): ApolloClient {
return apolloClient
}
fun getSubscriptionQueryClient(): SubscriptionsQuery {
return subscriptionQueryClient
}
fun getSubscriptionMutationClient(id_user: String, data: MutableList<MessageInput>): SubscriptionMutation {
val builder = SubscriptionMutation.builder()
builder.id_user(id_user)
builder.data(data)
return builder.build()
}
fun getSubscriptionSubscriptionClient(): SubscriptionUpdatedSubscription {
return subscriptionSubscriptionClient
}
fun getSubscriptionQueryCall(): ApolloCall<SubscriptionsQuery.Data> {
return apolloClient.query(subscriptionQueryClient)
}
fun getSubscriptionMutationCall(mutationBuilded: SubscriptionMutation): ApolloCall<SubscriptionMutation.Data> {
return apolloClient.mutate(mutationBuilded)
}
fun getSubscriptionSubscriptionCall(): ApolloSubscriptionCall<SubscriptionUpdatedSubscription.Data> {
return apolloClient.subscribe(subscriptionSubscriptionClient)
}
}
import com.apollographql.apollo.api.Response
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import io.reactivex.subscribers.DisposableSubscriber
import mu.KotlinLogging
class SubscriptionSubscriber : DisposableSubscriber<Response<SubscriptionUpdatedSubscription.Data>>() { val logger = KotlinLogging.logger {} override fun onNext(subscriptionUpdated: Response<SubscriptionUpdatedSubscription.Data>) { logger.info { "Value received" + subscriptionUpdated.data()?.SubscriptionUpdated().toString() }
override fun onError(t: Throwable) {
logger.info { "Error web socket " + t }
}
override fun onComplete() {
logger.info { "Complete Called "}
}
}
import com.apollographql.apollo.api.Response
import fr.vsct.tock.bot.connector.messenger.MessengerConnector
import fr.vsct.tock.bot.connector.messenger.model.send.*
import fr.vsct.tock.bot.engine.action.ActionMetadata
import fr.vsct.tock.bot.engine.action.ActionNotificationType
import fr.vsct.tock.bot.engine.action.ActionPriority
import fr.vsct.tock.bot.engine.action.SendSentence
import fr.vsct.tock.bot.engine.user.PlayerId
import fr.vsct.tock.bot.engine.user.PlayerType
import fr.vsct.tock.bot.open.data.MessengerConfiguration.pageId
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import fr.vsct.tock.bot.open.data.openBot
import io.reactivex.subscribers.DisposableSubscriber
import mu.KotlinLogging
class SubscriptionSubscriber : DisposableSubscriber<Response<SubscriptionUpdatedSubscription.Data>>() { val logger = KotlinLogging.logger {} override fun onNext(subscriptionUpdated: Response<SubscriptionUpdatedSubscription.Data>) { logger.info { "Value received in socket" } var id_user = subscriptionUpdated.data()?.SubscriptionUpdated()!!.id_user()
val data = subscriptionUpdated.data()?.SubscriptionUpdated()!!.data()
logger.info { "Push data to user $id_user" }
logger.info { "With messages $data" }
if(!data.isEmpty()){
val messengerConnector = MessengerConnector.getConnectorByPageId(pageId)!!
val sentence1 = SendSentence(
PlayerId(openBot.botId, PlayerType.bot),
pageId,
PlayerId(id_user, PlayerType.user),
"Re bonjour :) j'ai une information urgente à vous communiquer :",
mutableListOf(),
metadata = ActionMetadata(
priority = ActionPriority.high,
notificationType = ActionNotificationType.transportationUpdate
)
)
val sentence2 = SendSentence(
PlayerId(openBot.botId, PlayerType.bot),
pageId,
PlayerId(id_user, PlayerType.user),
null,
mutableListOf(
AttachmentMessage(
Attachment(
AttachmentType.template,
GenericPayload(
data.distinctBy { it.message() }.take(4).map { message ->
Element(
message.title(),
fr.vsct.tock.bot.open.data.OpenDataConfiguration.image,
message.message()
)
}
)
)
)
),
metadata = ActionMetadata(
priority = ActionPriority.high,
notificationType = ActionNotificationType.transportationUpdate
)
)
try {
messengerConnector.sendEvent(sentence1)
messengerConnector.sendEvent(sentence2)
} catch (e: Exception) {
logger.info { "Fail to push data to user $e" }
}
}
}
override fun onError(t: Throwable) {
logger.info { "Error web socket " + t }
}
override fun onComplete() {
logger.info { "Complete Called "}
}
}

Manual tests with playground

Deployment

val okHttpClient = OkHttpClient.Builder()
.addInterceptor(logging)
.pingInterval(30, TimeUnit.SECONDS)
.build()

Theoretical limits

Amazon implementation

Conclusion

Fullstack software engineer @Oui.sncf. I love discovering new technologies. Tech is as important as his usage.