Comment on page
GrainiteClient API
API for Clients and Producers
This API allows clients to access Topics, Tables, and Grains; enabling functionality like - pushing events to topics, invoking Grain action handlers, scanning Topics, and Tables, and accessing Grain state directly.
GrainiteClient
is used to connect with Grainite, which can then be used to interact with Topics, Tables, and Grains. Java
Python
import com.grainite.api.Grainite;
import com.grainite.api.GrainiteClient;
public class Client {
public static void main(String[] args) {
Grainite grainite = GrainiteClient.getClient("localhost", 5056);
}
}
An optional
clientID
parameter in the getClient()
method allows for creating and re-using connections to the server. This can be advantageous for when a single client wants to create multiple connections to the same server.import grainite_cl.api.grainite
import grainite_cl.api.grainite_client
grainite = grainite_client.get_client("localhost", 5056)
Topics are defined in the YAML configuration for an application and loaded into Grainite using
gx load
. Using Grainite
, we can access these topics and perform various actions on them. The following example gets a reference to a topic called "orders_topic" in an app called "food_app".Java
Python
Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Topic topic = grainite.getTopic("food_app", "orders_topic");
grainite = grainite_client.get_client("localhost", 5056)
topic = grainite.get_topic("food_app", "orders_topic")
Events can be appended to topics using
append(Event)
. Event
is an object that contains a key and a payload that will be sent to the topic. The following example sends an event with the key "#123" and payload "burger" to a topic.Java
Python
Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Topic topic = grainite.getTopic("food_app", "orders_topic");
topic.append(new Event(Key.of("#123"), Value.of("burger")));
Using
Callbacks
:topic.appendAsync(
new Event(Key.of("#123"), Value.of("impossible burger"))),
(completion, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
}
});
Using
Futures
:Future<RecordMetadata> rmdFuture = topic.appendAsync(new Event(Key.of("#123"), Value.of("impossible burger")));
// Do some work while waiting for the event to get appended into the topic.
RecordMetadata rmd = rmdFuture.get();
grainite = grainite_client.get_client("localhost", 5056)
topic = grainite.get_topic("food_app", "orders_topic")
topic.append(Event(Key("#123"), Value("burger")))
from grainite_cl.api.callback import Callback
# Define our own Callback to call after append_async is complete
class MyCallback(Callback):
def on_completion(completion: RecordMetadata, exception: Exception):
if exception is not None:
...
topic.append_async(
event=Event(Key("#123"), Value("impossible burger")),
callback=MyCallback()
)
Note that an event key can be up to 4KB in size and the payload can be up to 512KB in size. Future updates to Grainite will increase these limits.
All events in a topic can be read by using a
TopicReadCursor
. This cursor can be created by using the newTopicReadCursor
method and providing it with a cursorName
, a list of keys
to read events for (or null
to read all the events in the topic), a consumerIndex
that the cursor should start at, a numConsumers
for the cursor to read, and a boolean stopAtEol
that says whether to stop reading at EOL. (Javadocs, Pydocs)Java
Python
// Read only one event from the topic that has key #123
TopicReadCursor cursor =
topic.newTopicReadCursor(
"myCursor", Arrays.asList(Key.of("#123")), 1, 1, false);
Event nextEvent = cursor.next();
# Read only one event from the topic that has key #123
cursor = topic.new_read_cursor("my_cursor", [Key("#123")], 1, 1, False)
next_event = cursor.next()
The cursor can be saved and then loaded later to continue reading from where it left off:
Java
Python
cursor.save();
// Load a new cursor "myLoadedCursor" to pick up where "myCursor" left off.
cursor = topic.loadReadCursor("myCursor", "myLoadedCursor", 1, 1, false);
cursor.hasNext(); // Should be false
cursor.save()
# Load a new cursor "my_loaded_cursor" to pick up where "my_cursor" left off.
cursor = topic.load_read_cursor("my_cursor", "my_loaded_cursor", 1, 1, False)
cursor.has_next() # Should be False
Tables are defined in the YAML configuration for an application and loaded into Grainite using
gx load
. Using Grainite
, we can access these tables and perform various actions on them. The following example gets a reference to a table called "orders_table" in an app called "food_app".Java
Python
Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Table table = grainite.getTable("food_app", "orders_table");
grainite = grainite_client.get_client("localhost", 5056)
table = grainite.get_table("food_app", "orders_table")
Tables contain Grains, which can hold state and also have action handlers associated with them. Grains can be accessed by using
get(Value)
, where Value
is the key of the Grain. The following example gets a reference to the Grain with the key "#123" from the "orders_table".Java
Python
Grainite grainite = GrainiteClient.getClient("localhost", 5056);
Table table = grainite.getTable("food_app", "orders_table");
Grain grain = table.getGrain(Key.of("#123"));
Using
Callbacks
:table.getGrainAsync(Key.of("#123"), true, (grain, exception) -> {
if(exception != null) {
System.out.println("Unable to get grain: " + exception.getMessage());
} else {
System.out.println("Got grain: " + grain.getValue());
}
});
Using
Futures
:Future<Grain> grainFuture = table.getGrainAsync(Key.of("#123"), true);
// Do some work while waiting to get Grain.
Grain grain = grainFuture.get();
grainite = grainite_client.get_client("localhost", 5056)
table = grainite.get_table("food_app", "orders_table")
grain = table.get_grain(Key("#123"))
The
Grain
object can be used to access or modify the state of a particular Grain. The following example access and modifies the value of the "#123" Grain by incrementing the number of dishes in this order.Java
Python
Grain grain = table.getGrain(Key.of("#123"));
// Get the current number of dishes for this order.
long numDishes = grain.getValue().asLong();
// Increment and store the number of dishes for this order.
grain.setValue(Value.of(numDishes + 1));
Using
Callbacks
:// Get the current number of dishes for this order.
grain.getValueAsync((value) -> {
long numDishes = value.asLong();
// Increment and store the numer of dishes for this order.
grain.setValueAsync(Value.of(numDishes + 1), (status) -> {
if(status.isError()) {
System.out.println("The Grain value failed to update: " + status.getDescription());
} else {
System.out.println("The Grain value was updated.");
}
});
});
Using
Futures
:// Get the current number of dishes for this order.
Future<Value> grainValueFuture = grain.getValueAsync();
// Do something while waiting to get Grain value.
long numDishes = grainValueFuture.get().asLong();
// Increment and store the numer of dishes for this order.
Future<Status> grainValueFuture = grain.setValueAsync(Value.of(numDishes + 1));
// Do something while waiting to set Grain value.
Status status = grainValueFuture.get();
grain = table.get_grain(Key("#123"))
# Get the current number of dishes for this order.
num_dishes = grain.get_value().as_long()
# Increment and store the number of dishes for this order.
grain.set_value(Value(num_dishes + 1))
Similarly, a Grain's key can be accessed using
getKey()
.Along with the value, a Grain also contains 200 sorted maps to store state. These maps can be defined in the application YAML configuration file, even though this is not required. A name for the map can be defined in the application YAML configuration file and can be loaded into Grainite using
gx load
.The following example stores a timestamp and some payload, in the "history" map of a grain.
app.yaml
table:
...
maps:
- id: 0
name: stats
key_type: string
value_type: double
Java
Python
grain.mapPut("history", Key.of(timestamp), Value.of(payload));
Using
Callbacks
:grain.mapPutAsync("history", Key.of(timestamp), Value.of(payload),
(status, exception) -> {
if(exception == null) {
System.out.println("Inserted value into map.");
} else {
System.out.println("Error inserting value into map: " + exception.getMessage())
}
}
);
Using
Futures
:Future<Status> statusFuture = grain.mapPutAsync("history", Key.of(timestamp), Value.of(payload));
// Do something while waiting for value to be inserted into the map.
Status status = statusFuture.get();
grain.map_put("history", Key(timestamp), Value(payload))
Maps can be accessed by both name (eg: "history") and id (eg: 0).
Similarly, a map for a grain can be accessed by using
mapGet()
.Java
Python
grain.mapGet("history", Key.of(timestamp));
Using
Callbacks
:grain.mapGetAsync("history", Key.of(timestamp), (value, exception) -> {
if(exception == null) {
System.out.println("Grain map value is: " + value.toString());
} else {
System.out.println("Failed to get Grain map value:" + exception.getMessage());
}
});
Using
Futures
:Future<Value> mapValueFuture = grain.mapGetAsync("history", Key.of(timestamp));
// Do something while waiting to get map value.
Value mapValue = mapValueFuture.get();
grain.map_get("history", Key(timestamp))
mapQuery()
can be used to scan a Grain's map to get a range of values. The following example scans the "history" map of a Grain, to get all the data from Monday to Friday (keyed by timestamp).The Python API instead uses
map_get_range()
, which is equivalent to the mapGetRange()
API for Java that is now deprecated and was replaced with MapQuery.Java
Python
MapQuery query = MapQuery.newBuilder().setMapName("history")
.setRange(Key.of(mondayTS), Key.of(fridayTS))
.build()
Iterator<KeyValue> iter = grain.mapQuery(query);
// Iterate over the result of mapQuery
iter.forEachRemaining(kv -> {
System.out.println(kv.getKey().asString());
System.out.println(kv.getValue().asString());
});
Using
Callbacks
:MapQuery query = MapQuery.newBuilder().setMapName("history")
.setRange(Key.of(mondayTS), Key.of(fridayTS))
.build()
grain.mapQueryAsync(query, (iter, exception) -> {
if(exception == null) {
// Iterate over the result of mapQuery
iter.forEachRemaining(kv -> {
System.out.println(kv.getKey().asString());
System.out.println(kv.getValue().asString());
});
} else {
System.out.println("Failed to query map: " + exception.getMessage());
}
});
Using
Futures
:MapQuery query = MapQuery.newBuilder().setMapName("history")
.setRange(Key.of(mondayTS), Key.of(fridayTS))
.build()
Future<Iterator<KeyValue>> iterFuture = grain.mapQueryAsync(query);
// Do something while waiting for mapQuery to return.
// Iterate over the result of mapQuery
iter.forEachRemaining(kv -> {
System.out.println(kv.getKey().asString());
System.out.println(kv.getValue().asString());
});
cursor = MapScanCursor(Key(mondayTS), Key(fridayTS))
it = grain.map_get_range(0, cursor)
# Iterate over the result of map_get_range
while it.has_next():
kv = it.next()
print(kv.key.as_string())
print(kv.value.as_string())
Note that a Grain key can be up to 4KB in size and the value can be up to 512KB in size. The same limits apply to map keys (4KB maximum) and map values (512KB maximum). Future updates to Grainite will increase these limits.
As part of the early access secondary indexes feature, queries against indexes may be performed using the Client API. The Table API has a new
find()
API that either accepts a QueryExpression
object directly or a string query which is parsed and converted into a QueryExpression
object by the client. Valid queries are composed of individual "Simple" (property
==/!=
value) or "Range" (property </>/<=/>=
value) expressions, which can themselves be combined with AND/OR/NOT
logical operators to form arbitrarily complex query expressions.An example logical query (assuming indexes have been created for "income" and "city") could be:
income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')
We can pass the query to
table.find()
as a string and get an iterator of Grains as follows: Java
Python
Iterator<Grain> it = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");
// Iterate over the results of the query
while(it.hasNext()) {
Grain grain = it.next();
Value value = grain.getValue();
...
}
Using
Callbacks
:table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')", (iter, exception) -> {
if(exception == null) {
// Iterate over the result of the query
iter.forEachRemaining(grain -> {
System.out.println(grain.getKey().asString());
System.out.println(grain.getValue().asString());
});
} else {
System.out.println("Failed to query table: " + exception.getMessage());
}
});
Using
Futures
:Future<Iterator<Grain>> findFuture = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')");
// Do something while waiting for find to return.
Iterator<Grain> find = findFuture.get();
find.forEachRemaining(grain -> {
System.out.println(grain.getKey().asString());
System.out.println(grain.getValue().asString());
});
table.find()
became available in the Python API starting in 2321. String queries became available in the Python
table.find()
API in 2323. it = table.find("income > 100000 && (city == 'Los Angeles' || city == 'San Francisco')")
# Iterate over the results of the query
for grain in it:
value = grain.get_value()
...
Equivalent using a
QueryExpression
instead of a string query:query = AndQueryExpression(
RangeQueryExpression(Value("income"), Value(100000), Value.max_key(), False, True),
OrQueryExpression(
SimpleQueryExpression(Value("city"), Value("Los Angeles")),
SimpleQueryExpression(Value("city"), Value("San Francisco"))
),
)
it = table.find(query)
# Iterate over the results of the query
for grain in it:
value = grain.get_value()
...
A Grain's Action Handler can be triggered directly from a client using
invoke(ActionName, Value)
. This method takes a name of an action name, as well as a payload to send to the Action Handler. The following example invokes an action called addFoodToOrder
for a Grain, with the payload - "Burger".Java
Python
grain.invoke("addFoodToOrder", Key.of("Burger"));
Using
Callbacks
:grain.invokeAsync("addFoodToOrder", Key.of("Burger"),
(res, exception) -> {
if(exception == null) {
if(res.isError()) {
System.out.println("Invoke retured an error: " + res.getStatus().getDescription());
} else {
System.out.println("Invoke result: " + res.getResult().toString());
}
} else {
System.out.println("Failed to invoke: " + exception.getMessage());
}
}
);
Using
Futures
:Future<ResultOrStatus<Value>> resultFuture = grain.invokeAsync("addFoodToOrder", Key.of("Burger"));
// Do something while waiting for invoke to return.
ResultOrStatus<Value> result = resultFuture.get();
if(res.isError()) {
System.out.println("Invoke retured an error: " + res.getStatus().getDescription());
} else {
System.out.println("Invoke result: " + res.getResult().toString());
}
grain.invoke("add_food_to_order", Key("Burger"))
All the Grains in a Table can be scanned by using
scan(Cursor, NumItems)
. This method takes a cursor (which can be used in proceeding scans), and a limit in the form of an Integer
to specify the number of items to return. The following example scans a table for 10 Grains and prints their keys.Java
Python
Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
Iterator<Grain> iter = table.scan(cursor, 10);
iter.forEachRemaining(grain -> {
System.out.println(grain.getKey().asString());
});
Using
Callbacks
:Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
table.scanAsync(cursor, 10, (iter, exception) -> {
if(exception == null) {
iter.forEachRemaining(grain -> {
System.out.println(grain.getKey().asString());
});
} else {
System.out.println("Error scanning table: " + exception.getMessage());
}
});
Using
Futures
:Cursor cursor = Cursor.getScanCursor();
// Read first 10 grains in the table.
Future<Iterator<Grain>> iterFuture = table.scanAsync(cursor, 10);
// Do something while waiting for table scan to return.
Iterator<Grain> iter = iterFuture.get();
iter.forEachRemaining(grain -> {
System.out.println(grain.getKey().asString());
});
cursor = ScanCursor()
# Read first 10 grains in the table.
it = table.scan(cursor, 10)
while it.has_next():
Grain grain = iter.next()
print(grain.get_key().as_string())
Last modified 3mo ago