<< Click to Display Table of Contents >> Navigation: Development > Client > .NET Library > Direct Usage Example |
In this example, we use the DTSClient class as-is and go through the initialization process and a few requests.
The code example assumes the following project structure is available:
Project |
Connector |
Asset |
Type |
Parameters |
myproject |
myOraConnector |
DTSDEMO.NICE_TABLE |
Collection |
N/A |
DTSDEMO.NICE_FUNCTION |
Routine |
INT32, STRING |
||
DTSDEMO.NICE_STREAMY_FUNCTION |
Streaming Routine |
INT32 |
||
N/A |
nice_aggregate |
Aggregate |
N/A |
To see how to create projects, connectors and configure assets, please see the Web UI Section
using Alloy.DTS.Base.Model;
using Alloy.DTS.Base.Record;
using Alloy.DTS.Client;
namespace Alloy.DTS.ClientExamples
{
public class DirectDTSClientExample
{
public static void Main(string[] args)
{
// First create an instance of DTSClient
DTSClient dtsClient = new DTSClient();
// Open the DTS project in the client
// This sends a request to the DTS Controller to make the project available to the Client
// The Controller will respond immediately with some information regarding the project,
// and orders the necessary producers to be started if they aren't already
// this means that the process is asynchronous and data endpoints will not become available
// the instant the open() method returns.
dtsClient.Open("myproject");
// Retrieve the datasource created for the opened project
DTSDatasource ds = dtsClient.GetDatasource("myproject");
// Make sure the datasource is configured to contain our connector
ds.CheckEndpointName("myOraConnector");
// We must now wait for the endpoint for our connector to become available
ds.WaitForAllEndpoints(60 * 1000);
// We get a handle on the endpoint for the connector
DTSEndpoint myEndpoint = ds.GetEndpoint("myOraConnector");
// We open a stream to the table we want using a certain query predicate
string streamId = myEndpoint.OpenRecordStream("DTSDEMO.NICE_TABLE",
DTSPredicate.Eq("value", "100"));
// We fully consume the stream
while (myEndpoint.StreamHasMore(streamId))
{
// 100 records at a time
List <IValuesContainer> records = myEndpoint.GetRecordsFromStream(streamId, 100);
foreach (IValuesContainer rec in records)
{
// record fields can be accessed using reflection or transforming them into Maps
// here, we use maps
Dictionary<string, object> recordMap = rec.AsMap();
/*
do something with the record
*/
}
}
// Stream is spent, we close it
myEndpoint.CloseRecordStream(streamId);
// Let's also invoke a stored function that receives 2 arguments
IValuesContainer results = myEndpoint.ExecuteRoutine("DTS_DEMO.NICE_FUNCTION", new Object[] { 12, "active" });
// And access the single result using reflection this time
Type resultClass = dtsClient.PayloadFactory.TypesCluster.GetSystemType("DTS_DEMO.NICE_FUNCTION.RESULTS");
object theResult = resultClass.GetProperty("res1").GetValue(results);
/*
do something with it
*/
// We'll now get a record from an aggregate
// We get a handle on the aggregation endpoint. There is just one per project datasource.
DTSAggregationEndpoint aggEndpoint = ds.AggregationEndpoint;
// We open a stream on the aggregate as if it were a regular collection
streamId = aggEndpoint.OpenRecordStream("nice_aggregate", DTSPredicate.Eq("id", "3"));
// And we pick up our aggregate record
IValuesContainer record = aggEndpoint.GetRecordsFromStream(streamId, 1)[0];
/*
do something with it
*/
// Stream is not needed anymore, we close it
aggEndpoint.CloseRecordStream(streamId);
// Finally, let's use a remote call that returns a record stream
streamId = myEndpoint.ExecuteStreamRoutine("DTS_DEMO.NICE_STREAMY_FUNCTION", new Object[] { 1234 });
// We fully consume the stream
while (myEndpoint.StreamHasMore(streamId))
{
// 100 records at a time
List <IValuesContainer > records = myEndpoint.GetRecordsFromStream(streamId, 100);
foreach (IValuesContainer rec in records)
{
// record fields can be accessed using reflection or transforming them into Maps
// here, we use maps
Dictionary<string, object> recordMap = rec.AsMap();
/*
do something with the record
*/
}
}
// Stream is spent, we close it
myEndpoint.CloseRecordStream(streamId);
}
}
}