Direct Usage Example

<< Click to Display Table of Contents >>

Navigation:  Development > Client > Java Library >

Direct Usage Example

 

In this example, we use the DTSClient class as-is and go through the initialization process and a few requests.

The code example assumes the following project structure is available:

Project

Connector

Asset

Type

Parameters

myproject

myOraConnector

DTSDEMO.NICE_TABLE

Collection

N/A

DTSDEMO.NICE_FUNCTION

Routine

INT32, STRING

DTSDEMO.NICE_STREAMY_FUNCTION

Streaming Routine

INT32

kafka

test-topic

Topic

N/A

N/A

nice_aggregate

Aggregate

N/A

Information-icon_16px To see how to create projects, connectors and configure assets, please see the Web UI Section
 
import com.alloy.dts.DTSException;
import com.alloy.dts.client.DTSClient;
import com.alloy.dts.client.DTSDatasource;
import com.alloy.dts.client.DTSEndpoint;
import com.alloy.dts.client.DTSAggregationEndpoint;
import com.alloy.dts.model.DTSPredicate;
import com.alloy.dts.record.IValuesContainer;
import com.alloy.dts.record.StreamRecord;
 
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
 
public class DirectDTSClientExample {
 
  public static void main (String [] args) throws Exception {
 
      // First create an instance of DTSClient
      DTSClient dtsClient = new DTSClient();
 
      // Open the DTS project in the client
       // This sends a request to the DTS Controller to make the project available to the Client
       // The Controller will respond immediately with some information regarding the project,
       // and orders the necessary producers to be started if they aren't already
       // this means that the process is asynchronous and data endpoints will not become available
       // the instant the open() method returns.
      dtsClient.open("myproject");
 
      // Retrieve the datasource created for the opened project
      DTSDatasource ds = dtsClient.getDatasource("myproject");
 
      // Make sure the datasource is configured to contain our connector
      ds.checkEndpointName("myOraConnector");
 
      // We must now wait for the endpoint for our connector to become available
      ds.waitForAllEndpoints(60 * 1000);
 
      // We get a handle on the endpoint for the connector
      DTSEndpoint myEndpoint = ds.getEndpoint("myOraConnector");
      // We open a stream to the table we want using a certain query predicate
      String streamId = myEndpoint.openRecordStream("DTSDEMO.NICE_TABLE",
              DTSPredicate.eq("value", "100"));
      // We fully consume the stream
      while(myEndpoint.streamHasMore(streamId)) {
          // 100 records at a time
          List<? extends StreamRecord> records = myEndpoint.getRecordsFromStream(streamId, 100);
          for (StreamRecord rec : records) {
              // record fields can be accessed using reflection or transforming them into Maps
               // here, we use maps
              Map<String, Object> recordMap = rec.asMap();
              /*
               do something with the record
                */
          }
       }
      // Stream is spent, we close it
      myEndpoint.closeRecordStream(streamId);
 
      // Let's also invoke a stored function that receives 2 arguments
      IValuesContainer results = myEndpoint.executeCall("DTS_DEMO.NICE_FUNCTION", new Object[] {12, "active"});
      // And access the single result using reflection this time
      Class resultClass = dtsClient.getPayloadFactory().getTypesCluster().getClassForTypeName("DTS_DEMO.NICE_FUNCTION.RESULTS");
      Object theResult = resultClass.getField("res1").get(results);
      /*
       do something with it
        */
 
       // We'll now get a record from an aggregate
 
       // We get a handle on the aggregation endpoint. There is just one per project datasource.
      DTSAggregationEndpoint aggEndpoint = ds.getAggregationEndpoint();
 
      // We open a stream on the aggregate as if it were a regular collection
      streamId = aggEndpoint.openRecordStream("nice_aggregate", DTSPredicate.eq("id", "3"));
 
      // And we pick up our aggregate record
      StreamRecord record = aggEndpoint.getRecordsFromStream(streamId, 1).get(0);
  /*
       do something with it
        */
 
       // Stream is not needed anymore, we close it
      aggEndpoint.closeRecordStream(streamId);
 
      // Now let's use a remote call that returns a record stream
      streamId = myEndpoint.executeStreamCall("DTS_DEMO.NICE_STREAMY_FUNCTION", new Object[] {1234});
 
      // We fully consume the stream
      while(myEndpoint.streamHasMore(streamId)) {
          // 100 records at a time
          List<? extends StreamRecord> records = myEndpoint.getRecordsFromStream(streamId, 100);
          for (StreamRecord rec : records) {
              // record fields can be accessed using reflection or transforming them into Maps
               // here, we use maps
              Map<String, Object> recordMap = rec.asMap();
              /*
               do something with the record
                */
          }
       }
      // Stream is spent, we close it
      myEndpoint.closeRecordStream(streamId);
 
      // Let's also do some work on a topic
      DTSEndpoint kafkaEndpoint = ds.getEndpoint("kafka");
 
      // First, we can subscribe to it -> which gives us a stream we can consume
       // We provide a group ID so that Kafka remembers where we leave off
      streamId = kafkaEndpoint.subscribeToTopic("test-topic", "dts-java");
 
      // Now we can poll some entries from the topic
      List<? extends StreamRecord> records = kafkaEndpoint.pollTopic(streamId, 5000);
      for (StreamRecord rec : records) {
          Map<String, Object> recordMap = rec.asMap();
          // topic records will always contain the key and msg attributes
           // however, their types can vary
          System.out.println(recordMap.get("key") + " -> " + recordMap.get("msg"));
       }
 
      // We unsubscribe to release the stream
      kafkaEndpoint.unsubscribeFromTopic(streamId);
 
      // We can also do some pushing
       // First we do a single string/string push
      kafkaEndpoint.pushToTopic("test-topic", "some_key", "some_message");
      // Now let's push a record with a binary message
      IValuesContainer aRecord = (IValuesContainer) dtsClient.getPayloadFactory().getTypesCluster()
               .getClassForTypeName("test-topic.RECORD").newInstance();
      aRecord.readValues(new Object[] {"some_other_key", new byte[]{67,83,28,91,4,77,82,12}});
      kafkaEndpoint.pushToTopic("test-topic", aRecord);
      // Let's also push multiple records at once
      kafkaEndpoint.pushManyToTopic(
              "test-topic",
              new Object[] {"key1", "key2", "key3"},
              new Object[] {"msg1", "msg2", "msg3"});
 
      // Finally, let's shut down the client
       // This will ensure all streams are closed and all resources are released
      dtsClient.closeAllAndShutdown();
   }
}