Skip to main content

Source Connector

There are only TWO requirements when it comes to writing a valid Nakji Network Source Connector:

  1. Write protobuf (proto3) messages to Nakji Message Queue (Kafka) using transactions.
  2. Manifest file that contains connector metadata (eg title, author, version).

There are no DSLs (domain specific languages) you have to learn, no weird structures you have to abide by, and no awkward terminology you have to remember.

Just Send Data.

Example

This is an example of a barebones Ethereum connector which ingests Block and Transaction data into Nakji, written in Golang.

Check out the repo if you prefer to dive directly in the code :)

info

This is beta software! Bug reports and suggestions are highly encouraged :)

Prerequisites

More details in the Readme

Protobuf

First, a protobuf definition file needs to be created. Protobuf is the platform-neutral data format for almost all the data in motion within Nakji, whether it's into or out of the Nakji Message Queue. Make sure to use Version 3 of the Protobuf spec (proto3 documentation).

/ethereum.proto
syntax = "proto3";

import "google/protobuf/timestamp.proto";

package ethereum;

option go_package = "github.com/nakji-network/connector/examples/ethereum_connector";

// to convert addresses from bytes to hex address, https://github.com/ethereum/go-ethereum/blob/4b2ff1457ac28fb2894485194e0e344e84c2bcd7/common/types.go#L210
message Transaction {
google.protobuf.Timestamp ts = 1; //uint64
bytes From = 2;
string Hash = 3;
double Size = 4;
uint64 AccountNonce = 5; // uint64
uint64 Price = 6; // big.int
uint64 GasLimit = 7; // uint64
bytes Recipient = 8;
uint64 Amount = 9; // big.int
bytes Payload = 10;
uint64 V = 11; // big.int
uint64 R = 12; // big.int
uint64 S = 13; // big.int
}

message Block {
google.protobuf.Timestamp ts = 1; //uint64
string Hash = 2;
uint64 Difficulty = 3; //bigint
uint64 Number = 4; //bigint
uint64 GasLimit = 5; // uint64
uint64 GasUsed = 6; // uint64
uint64 Nonce = 7; //[8]byte .Uint64()
}

The proto definitions can be compiled by the protoc compiler into any language it supports (currently 7) to enable seamless cross-platform/cross-service communication. For purposes of this example, follow the Protobuf for Golang guide here to get set up.

Manifest

Create a file named manifest.yaml in the project root with the following fields.

/manifest.yaml
package: ethereum
owner: nakji
version: 0.0.0

Executable

The cmd/ethereum/main.go file prepares all the config variables and Kafka connections.

/cmd/ethereum/main.go
// This connector ingests real time data from any EVM compatible chain.
package main

import (
"strings"

"github.com/nakji-network/connector"
"github.com/nakji-network/connector/examples/ethereum"
)

func init() {
conf.SetDefault("rpcs.ethereum.full", []string{"wss://mainnet.infura.io/ws/v3/<api_key>"})
}

func main() {
c := connector.NewConnector()

// Get config variables using functions from Viper (https://pkg.go.dev/github.com/spf13/viper#readme-getting-values-from-viper)
RPCURLs := c.Config.GetStringSlice("rpcs.ethereum.full")

// For the purposes of this example, we'll just grab one of the websocket RPCs
var RPCURL string
for _, u := range RPCURLs {
if strings.HasPrefix(u, "ws") {
RPCURL = u
break
}
}

ethConnector := ethereum.EthereumConnector{
Connector: c,
RPCURL: RPCURL,
}

ethConnector.Start()
}

Main connector code

The ethereum.go file connects to the data source (in this case, an Ethereum RPC). When data is received, it is cleaned and shaped into the Protobuf format defined above, and sent to the correct Topics in the Nakji Message Queue (Kafka).

/ethereum.go
// ethereum package follows https://goethereumbook.org/block-subscribe/ to
// subscribe to new Blocks and Transactions and writes the results to Nakji.
package ethereum

import (
"context"
"fmt"
"os"
"os/signal"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/nakji-network/connector"
"github.com/rs/zerolog/log"
)

type EthereumConnector struct {
*connector.Connector // embed Nakji connector.Connector into your custom connector to get access to all its methods

RPCURL string
}

func (c *EthereumConnector) Start() {
// Listen for interrupt in order to cleanly close connections later
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

// connect to Ethereum RPC websockets
log.Info().Str("url", c.RPCURL).Msg("connecting to Ethereum RPC")
client, err := ethclient.DialContext(context.Background(), c.RPCURL)
if err != nil {
log.Fatal().Err(err).Msg("Ethereum RPC connection error")
}
defer client.Close()

// Subscribe to headers
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Fatal().Err(err)
}

// Main loop to process errors and headers
go func() {
for {
select {
case err := <-sub.Err():
log.Fatal().Err(err)
case header := <-headers:
block, err := client.BlockByHash(context.Background(), header.Hash())
if err != nil {
log.Fatal().Err(err).Msg("BlockByHash error")
}

PrintBlock(block)

// EthBlock -> Block -> Protobuf -> kafka
var blockData Block
blockData.UnmarshalEthBlock(block)
err = c.ProduceMessage("ethereum", "ethereum", &blockData)
if err != nil {
log.Error().Err(err).Msg("Kafka write proto")
}

// EthTransaction -> Transaction -> Protobuf -> Kafka
for _, tx := range block.Transactions() {
txData := Transaction{}
txData.UnmarshalEthTransaction(tx)
txData.Ts = blockData.Ts // Timestamp isn't in the raw transaction from geth

err = c.ProduceMessage("ethereum", "ethereum", &txData)
if err != nil {
log.Error().Err(err).Msg("Kafka write proto")
}
}

// Commit Kafka Transaction
err = c.Producer.CommitTransaction(nil)
if err != nil {
log.Error().Err(err).Msg("Processor: Failed to commit transaction")

err = c.Producer.AbortTransaction(nil)
if err != nil {
log.Fatal().Err(err).Msg("")
}
}
// Start a new transaction
err = c.BeginTransaction()
if err != nil {
log.Fatal().Err(err).Msg("")
}

}
}
}()

for {
select {
case <-interrupt:
log.Debug().Msg("interrupt")

// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
client.Close()
c.Producer.Close()
return
}
}
}

func PrintBlock(block *types.Block) {
fmt.Printf("hash: %s\n", block.Hash().Hex()) // 0xbc10defa8dda384c96a17640d84de5578804945d347072e091b4e5f390ddea7f
fmt.Printf("num: %v\n", block.Number().Uint64()) // 3477413
fmt.Printf("time: %v\n", block.Time()) // 1529525947
fmt.Printf("nonce: %v\n", block.Nonce()) // 130524141876765836
fmt.Printf("#tx: %v\n", len(block.Transactions())) // 7
fmt.Printf("gaslim: %v\n", block.GasLimit()) // 1529525947
fmt.Printf("gasuse: %v\n", block.GasUsed()) // 1529525947
fmt.Printf("diff: %v\n", block.Difficulty()) // 1529525947
}

The full code for this example is here and contains some of the helper functions used above. You can find other examples here.

Advanced

In some cases, a source connector may use input data from the Nakji message queue instead of an external source. This reduces the need for every connector to make a new RPC request, at the cost of slightly increasing latency. For example:

47cc7e8b4a4ba447afff0e10d5cf51b7

This is a rare use case.

Support

Feel free to reach out to the team if you run into issues or need assistance!