<< Click to Display Table of Contents >> Navigation: Development > Client > Java Library > Direct Usage Example |
In this example, we use the DTSClient class as-is and go through the initialization process and a few requests.
The code example assumes the following project structure is available:
Project |
Connector |
Asset |
Type |
Parameters |
myproject |
myOraConnector |
DTSDEMO.NICE_TABLE |
Collection |
N/A |
DTSDEMO.NICE_FUNCTION |
Routine |
INT32, STRING |
||
DTSDEMO.NICE_STREAMY_FUNCTION |
Streaming Routine |
INT32 |
||
kafka |
test-topic |
Topic |
N/A |
|
N/A |
nice_aggregate |
Aggregate |
N/A |
To see how to create projects, connectors and configure assets, please see the Web UI Section
import com.alloy.dts.DTSException;
import com.alloy.dts.client.DTSClient;
import com.alloy.dts.client.DTSDatasource;
import com.alloy.dts.client.DTSEndpoint;
import com.alloy.dts.client.DTSAggregationEndpoint;
import com.alloy.dts.model.DTSPredicate;
import com.alloy.dts.record.IValuesContainer;
import com.alloy.dts.record.StreamRecord;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DirectDTSClientExample {
public static void main (String [] args) throws Exception {
// First create an instance of DTSClient
DTSClient dtsClient = new DTSClient();
// Open the DTS project in the client
// This sends a request to the DTS Controller to make the project available to the Client
// The Controller will respond immediately with some information regarding the project,
// and orders the necessary producers to be started if they aren't already
// this means that the process is asynchronous and data endpoints will not become available
// the instant the open() method returns.
dtsClient.open("myproject");
// Retrieve the datasource created for the opened project
DTSDatasource ds = dtsClient.getDatasource("myproject");
// Make sure the datasource is configured to contain our connector
ds.checkEndpointName("myOraConnector");
// We must now wait for the endpoint for our connector to become available
ds.waitForAllEndpoints(60 * 1000);
// We get a handle on the endpoint for the connector
DTSEndpoint myEndpoint = ds.getEndpoint("myOraConnector");
// We open a stream to the table we want using a certain query predicate
String streamId = myEndpoint.openRecordStream("DTSDEMO.NICE_TABLE",
DTSPredicate.eq("value", "100"));
// We fully consume the stream
while(myEndpoint.streamHasMore(streamId)) {
// 100 records at a time
List<? extends StreamRecord> records = myEndpoint.getRecordsFromStream(streamId, 100);
for (StreamRecord rec : records) {
// record fields can be accessed using reflection or transforming them into Maps
// here, we use maps
Map<String, Object> recordMap = rec.asMap();
/*
do something with the record
*/
}
}
// Stream is spent, we close it
myEndpoint.closeRecordStream(streamId);
// Let's also invoke a stored function that receives 2 arguments
IValuesContainer results = myEndpoint.executeCall("DTS_DEMO.NICE_FUNCTION", new Object[] {12, "active"});
// And access the single result using reflection this time
Class resultClass = dtsClient.getPayloadFactory().getTypesCluster().getClassForTypeName("DTS_DEMO.NICE_FUNCTION.RESULTS");
Object theResult = resultClass.getField("res1").get(results);
/*
do something with it
*/
// We'll now get a record from an aggregate
// We get a handle on the aggregation endpoint. There is just one per project datasource.
DTSAggregationEndpoint aggEndpoint = ds.getAggregationEndpoint();
// We open a stream on the aggregate as if it were a regular collection
streamId = aggEndpoint.openRecordStream("nice_aggregate", DTSPredicate.eq("id", "3"));
// And we pick up our aggregate record
StreamRecord record = aggEndpoint.getRecordsFromStream(streamId, 1).get(0);
/*
do something with it
*/
// Stream is not needed anymore, we close it
aggEndpoint.closeRecordStream(streamId);
// Now let's use a remote call that returns a record stream
streamId = myEndpoint.executeStreamCall("DTS_DEMO.NICE_STREAMY_FUNCTION", new Object[] {1234});
// We fully consume the stream
while(myEndpoint.streamHasMore(streamId)) {
// 100 records at a time
List<? extends StreamRecord> records = myEndpoint.getRecordsFromStream(streamId, 100);
for (StreamRecord rec : records) {
// record fields can be accessed using reflection or transforming them into Maps
// here, we use maps
Map<String, Object> recordMap = rec.asMap();
/*
do something with the record
*/
}
}
// Stream is spent, we close it
myEndpoint.closeRecordStream(streamId);
// Let's also do some work on a topic
DTSEndpoint kafkaEndpoint = ds.getEndpoint("kafka");
// First, we can subscribe to it -> which gives us a stream we can consume
// We provide a group ID so that Kafka remembers where we leave off
streamId = kafkaEndpoint.subscribeToTopic("test-topic", "dts-java");
// Now we can poll some entries from the topic
List<? extends StreamRecord> records = kafkaEndpoint.pollTopic(streamId, 5000);
for (StreamRecord rec : records) {
Map<String, Object> recordMap = rec.asMap();
// topic records will always contain the key and msg attributes
// however, their types can vary
System.out.println(recordMap.get("key") + " -> " + recordMap.get("msg"));
}
// We unsubscribe to release the stream
kafkaEndpoint.unsubscribeFromTopic(streamId);
// We can also do some pushing
// First we do a single string/string push
kafkaEndpoint.pushToTopic("test-topic", "some_key", "some_message");
// Now let's push a record with a binary message
IValuesContainer aRecord = (IValuesContainer) dtsClient.getPayloadFactory().getTypesCluster()
.getClassForTypeName("test-topic.RECORD").newInstance();
aRecord.readValues(new Object[] {"some_other_key", new byte[]{67,83,28,91,4,77,82,12}});
kafkaEndpoint.pushToTopic("test-topic", aRecord);
// Let's also push multiple records at once
kafkaEndpoint.pushManyToTopic(
"test-topic",
new Object[] {"key1", "key2", "key3"},
new Object[] {"msg1", "msg2", "msg3"});
// Finally, let's shut down the client
// This will ensure all streams are closed and all resources are released
dtsClient.closeAllAndShutdown();
}
}