turbine

package module
Version: v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2022 License: None detected not legal advice Imports: 0 Imported by: 0

README

Turbine

PkgGoDev

turbine logo

Turbine is a data application framework for building server-side applications that are event-driven, respond to data in real-time, and scale using cloud-native best practices.

The benefits of using Turbine include:

  • Native Developer Tooling: Turbine doesn't come with any bespoke DSL or patterns. Write software like you normally would!

  • Fits into Existing DevOps Workflows: Build, test, and deploy. Turbine encourages best practices from the start. Don't test your data app in production ever again.

  • Local Development mirrors Production: When running locally, you'll immediately see how your app reacts to data. What you get there will be exactly what happens in production but with scale and speed.

  • Available in many different programming langauages: Turbine started out in Go but is available in other languages too:

Getting Started

To get started, you'll need to download the Meroxa CLI. Once downloaded and installed, you'll need to back to your terminal and initialize a new project:

$ meroxa apps init testapp --lang golang

The CLI will create a new folder called testapp located in the directory where the command was issued. If you want to initialize the app somewhere else, you can append the --path flag to the command (meroxa apps init testapp --lang golang --path ~/anotherdir). Once you enter the testapp directory, the contents will look like this:

$ tree testapp/
testapp
├── README.md
├── app.go
├── app.json
├── app_test.go
└── fixtures
	├── demo-cdc.json
	└── demo-no-cdc.json

This will be a full-fledged Turbine app that can run. You can even run the tests using the command meroxa apps run in the root of the app directory. It provides just enough to show you what you need to get started.

app.go

This configuration file is where you begin your Turbine journey. Any time a Turbine app runs, this is the entry point for the entire application. When the project is created, the file will look like this:

package main

import (
	"crypto/md5"
	"encoding/hex"
	"fmt"
	"log"

	"github.com/meroxa/turbine-go"
	"github.com/meroxa/turbine-go/runner"
)

func main() {
	runner.Start(App{})
}

var _ turbine.App = (*App)(nil)

type App struct{}

func (a App) Run(v turbine.Turbine) error {
	source, err := v.Resources("source_name")
	if err != nil {
		return err
	}

	rr, err := source.Records("collection_name", nil)
	if err != nil {
		return err
	}

	res, _ := v.Process(rr, Anonymize{})

	dest, err := v.Resources("destination_name")
	if err != nil {
		return err
	}

	err = dest.Write(res, "collection_archive")
	if err != nil {
		return err
	}

	return nil
}

type Anonymize struct{}

func (f Anonymize) Process(stream []turbine.Record) ([]turbine.Record, []turbine.RecordWithError) {
	for i, r := range stream {
		e := fmt.Sprintf("%s", r.Payload.Get("customer_email"))
		if e == "" {
			log.Println("unable to find customer_email value in %d record", i)
			break
		}
		hashedEmail := consistentHash(e)
		err := r.Payload.Set("customer_email", hashedEmail)
		if err != nil {
			log.Println("error setting value: ", err)
			break
		}
		stream[i] = r
	}
	return stream, nil
}

func consistentHash(s string) string {
	h := md5.Sum([]byte(s))
	return hex.EncodeToString(h[:])
}

Let's talk about the important parts of this code. Turbine apps have five functions that comprise the entire DSL. Outside of these functions, you can write whatever code you want to accomplish your tasks:

func (a App) Run(v turbine.Turbine) error

Run is the main entry point for the application. This is where you can initialize the Turbine framework. This is also the place where, when you deploy your Turbine app to Meroxa, Meroxa will use this as the place to boot up the application.

source, err := v.Resources("source_name")

The Resources function identifies the upstream or downstream system that you want your code to work with. The source_name is the string identifier of the particular system. The string should map to an associated identifier in your app.json to configure what's being connected to. For more details, see the app.json section.

rr, err := source.Records("collection_name", nil)

Once you've got Resources set up, you can now stream records from it, but you need to identify what records you want. The Records function identifies the records or events you want to stream into your data app.

res, _ := v.Process(rr, Anonymize{})

The Process function is Turbine's way of saying, for the records that are coming in, I want you to process these records against a function. Once your app is deployed on Meroxa, Meroxa will do the work to take each record or event that does get streamed to your app and then run your code against it. This allows Meroxa to scale out your processing relative to the velocity of the records streaming in.

err = dest.Write(res, "collection_archive")

The Write function is optional. It takes any records given to it and streams them to the downstream system. In many cases, you might not need to stream data to another system, but this gives you an easy way to do so.

app.json

This file contains all of the options for configuring a Turbine app. Upon initialization of an app, the CLI will scaffold the file for you with available options:

{
  "name": "testapp",
  "language": "golang",
  "environment": "common",
  "resources": {
    "source_name": "fixtures/path"
  }
}
  • name - The name of your application. This should not change after app initialization.
  • language - Tells Meroxa what language the app is upon deployment.
  • environment - "common" is the only available environment. Meroxa has the ability to create isolated environments, but this feature is currently in beta.
  • resources - These are the named integrations that you'll use in your application. The name needs to match the name of the resource that you'll set up in Meroxa using the meroxa resources create command or via the Dashboard. You can point to the path in the fixtures that'll be used to mock the resource when you run meroxa apps run.
Fixtures

Fixtures are JSON-formatted samples of data records you can use while locally developing your Turbine app. Whether CDC or non-CDC-formatted data records, fixtures adhere to the following structure:

{
  "collection_name": [
    {
      "key": "1",
      "value": {
		  "schema": {
			  ...
		  },
		  "payload": {
			  ...
		  }
		}
	}
  ]
  • collection_name — Identifies the name of the records or events you are streaming to your data app.
  • key — Denotes one or more sample records within a fixture file. key is always a string.
  • value — Holds the schema and payload of the sample data record.
  • schema — Comes as part of your sample data record. schema describes the record or event structure.
  • payload — Comes as part of your sample data record. payload describes what about the record or event changed.

Your newly created data app should have a demo-cdc.json and demo-non-cdc.json in the /fixtures directory as examples to follow.

Testing

Testing should follow standard Go development practices.

Documentation && Reference

The most comprehensive documentation for Turbine and how to work with Turbine apps is on the Meroxa site: https://docs.meroxa.com/

For the Go Reference, check out https://pkg.meroxa.io/badge/github.com/meroxa/turbine-go.

Contributing

Check out the /docs/ folder for more information on how to contribute.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ReadAppConfig = func(appName, appPath string) (AppConfig, error) {
	if appPath == "" {
		exePath, err := os.Executable()
		if err != nil {
			log.Fatalf("unable to locate executable path; error: %s", err)
		}
		appPath = path.Dir(exePath)
	}

	b, err := os.ReadFile(appPath + "/" + "app.json")
	if err != nil {
		return AppConfig{}, err
	}

	var ac AppConfig
	err = json.Unmarshal(b, &ac)
	if err != nil {
		return AppConfig{}, err
	}

	if appName != "" {
		ac.Name = appName
	}
	err = ac.validateAppConfig()
	if err != nil {
		return AppConfig{}, err
	}

	ac.setPipelineName()
	return ac, nil
}

Functions

This section is empty.

Types

type App

type App interface {
	Run(Turbine) error
}

type AppConfig

type AppConfig struct {
	Name        string            `json:"name"`
	Environment string            `json:"environment"`
	Pipeline    string            `json:"pipeline"` // TODO: Eventually remove support for providing a pipeline if we need to
	Resources   map[string]string `json:"resources"`
}

type ConnectionOption

type ConnectionOption struct {
	Field string
	Value string
}

type ConnectionOptions

type ConnectionOptions []ConnectionOption

func (ConnectionOptions) ToMap

func (cfg ConnectionOptions) ToMap() map[string]interface{}

type Function

type Function interface {
	Process(r []Record) []Record
}

type Payload

type Payload []byte

func (*Payload) Delete

func (p *Payload) Delete(path string) error

func (Payload) Get

func (p Payload) Get(path string) interface{}

func (Payload) Map

func (p Payload) Map() (map[string]interface{}, error)

func (*Payload) Set

func (p *Payload) Set(path string, value interface{}) error

type Record

type Record struct {
	Key       string
	Payload   Payload
	Timestamp time.Time
}

func GetRecords

func GetRecords(r Records) []Record

func (Record) JSONSchema

func (r Record) JSONSchema() bool

JSONSchema returns true if the record is formatted with JSON Schema, false otherwise

func (Record) OpenCDC

func (r Record) OpenCDC() bool

OpenCDC returns true if the record is formatted with OpenCDC schema, false otherwise

type RecordWithError

type RecordWithError struct {
	Error error
	Record
}

type Records

type Records struct {
	Stream string
	// contains filtered or unexported fields
}

func NewRecords

func NewRecords(rr []Record) Records

type Resource

type Resource interface {
	Records(collection string, cfg ConnectionOptions) (Records, error)
	Write(records Records, collection string) error
	WriteWithConfig(records Records, collection string, cfg ConnectionOptions) error
}

type ResourceConfig deprecated

type ResourceConfig = ConnectionOption

Deprecated: Use ConnectionOption instead

type ResourceConfigs deprecated

type ResourceConfigs = ConnectionOptions

Deprecated: Use ConnectionOptions instead

type Turbine

type Turbine interface {
	Resources(string) (Resource, error)
	Process(Records, Function) Records
	RegisterSecret(string) error
}

Directories

Path Synopsis
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL