1. Generate IDL for C# .NET
  2. First code
  3. Bytes everywhere
  4. Retrieving data examples
  5. You may find this useful
  6. Filters
  7. Multithreading
  8. Long running scanners
  9. .NET C# Sample Code for HBase Thrift Download

Generate IDL for C# .NET

Before we can start coding we need to prepare Thrift code for HBase using Thrift code generator. You can read more about Interface Description Language (IDL) here. Thrift can be downloaded from its download page and hbase.thrift file from HBase GIT repository. I used "Thrift for windows" version 0.9.3.

Put these two files together in one directory and run following command to generate C# code:

thrift -r --gen csharp hbase.thrift

Now you can take generated classes and import them to your .NET project. The generated code needs appropriate version of Thrift library referenced by your project. You should be able to find it on the nugget under ApacheThrift name.


First code

The hard part is done :-) Now we can try to connect to our Thrift node. To do this we need to create necessary socket, transport and protocol for this. Having those we can create our client which will be used to communicate with HBase.

static string serverHostName = "your_server_address_here";
static int port = 9090;

static void Main(string[] args)
{
    using (var socket = new TSocket(serverHostName, port))
    using (var transport = new TBufferedTransport(socket))
    using (var protocol = new TBinaryProtocol(transport))
    using (var client = new Hbase.Client(protocol))
    {

    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey();
}

From now on the situation is trivial and depends on what you want to achive.


Bytes everywhere

Thing worth to mention is that the client interface is based mostly on byte arrays. That means things like table names, rowids, even filters must be passed as byte[]. In my opinion it’s worth to consider having these two extensions in your project:

public static byte[] GetUTF8Bytes(this string @this)
{
    return Encoding.UTF8.GetBytes(@this);
}

public static string GetUTF8String(this byte[] @this)
{
    return Encoding.UTF8.GetString(@this);
}

Retrieving data examples

There are two main ways of retrieving data from HBase. First one is a scan. Scan is used on a table when you don’t know exact RowIds you are interested in. By default scan returns all data from table but there are few ways of manipulating scan range (eg. you can run scans starting from given RowId or ask for particular columns).

static string serverHostName = "your_server_address_here";
static int port = 9090;

public static void ScanExample()
{
    using (var socket = new TSocket(serverHostName, port))
    using (var transport = new TBufferedTransport(socket))
    using (var protocol = new TBinaryProtocol(transport))
    using (var client = new Hbase.Client(protocol))
    {
        TScan tscan = new TScan();
        tscan.Columns = new List<byte[]>() { "family1".GetUTF8Bytes(), "family2".GetUTF8Bytes() };
        tscan.StartRow = "id_5".GetUTF8Bytes();
        tscan.StopRow = "id_23".GetUTF8Bytes();

        byte[] tableName = "tutorial:TestTable".GetUTF8Bytes();

        int scannerId = client.scannerOpenWithScan(tableName, tscan, new Dictionary<byte[], byte[]>());

        int scannerBatchSize = 100;
        List<TRowResult> result = null;

        do
        {
            result = client.scannerGetList(scannerId, scannerBatchSize);

            //Do something with result
        }
        while (result.Count == scannerBatchSize);
    }
}

Note that Columns property need to get columns family name not the column names. You can read more about column families on this site.

Another way to retrieve data is to use Get function to retrieve rows having known RowIDs.

static string serverHostName = "your_server_address_here";
static int port = 9090;

public static void GetExample()
{
	List<TRowResult> result = null;
	
    using (var socket = new TSocket(serverHostName, port))
    using (var transport = new TBufferedTransport(socket))
    using (var protocol = new TBinaryProtocol(transport))
    using (var client = new Hbase.Client(protocol))
    {
        byte[] tableName = "tutorial:TestTable".GetUTF8Bytes();

        List<byte[]> rows = new List<byte[]>() { "id_2".GetUTF8Bytes(), "id_3".GetUTF8Bytes(), "id_4".GetUTF8Bytes() };

        result = client.getRows(tableName, rows, new Dictionary<byte[], byte[]>());
    }

    //Do something with result
}

You may find this useful

If you plan to check and/or compare HBase timestamp you will need to convert it to DateTime or DateTime to timestamp. HBase timestamp is a unix epoch time and can be converted like this:

public static DateTime ToDateTime(this long @this)
{
    return new DateTime(@this * 10000 + 621355968000000000);
}

public static long TicksJava(this DateTime @this)
{
    return (@this.Ticks - 621355968000000000) / 10000;
}

You can retrieve column’s value from a row like this:

public static byte[] GetColumnValue(this TRowResult @this, string columnName)
{
    return @this.Columns.Where(c => c.Key.GetUTF8String() == columnName).Single().Value.Value;
}

public static string GetColumnValueAsString(this TRowResult @this, string columnName)
{
    return @this.GetColumnValue(columnName).GetUTF8String();
}

If you need, for debugging purpose as an example, save your HBase output to disk or convert it to list of readable string arrays you can use following functions:

public static void ToFile(this List<TRowResult> @this, string directory)
{
    if (!Directory.Exists(directory))
        Directory.CreateDirectory(directory);

    var resultsKVP = @this.Select(r => new { Id = r.Row.GetUTF8String(), Columns = r.Columns.Select(c => new { Key = c.Key.GetUTF8String(), Value = c.Value.Value.GetUTF8String() }) }).ToList();

    foreach (var item in resultsKVP)
    {
        using (FileStream fs = new FileStream(Path.Combine(directory, item.Id + ".rowDetails"), FileMode.Create))
        using (StreamWriter sw = new StreamWriter(fs))
        {
            foreach (var kvp in item.Columns)
                sw.WriteLine(string.Format("{0}: {1}", kvp.Key, kvp.Value));
        }
    }
}

public static List<string[]> ToReadableList(this List<TRowResult> @this)
{
    List<string[]> values = new List<string[]>();

    for (int i = 0; i < @this.Count; i++)
    {
        string[] vals = new string[@this[i].Columns.Count];

        int j = 0;

        foreach (var column in @this[i].Columns)
            vals[j++] = column.Value.Value.GetUTF8String();

        values.Add(vals);
    }

    return values;
}

Filters

Here is an example of using SingleColumnValueFilter. But be aware the column filters I had used proved to be inefficient. If you really need to scan for rows with particular data then it would be better to make this data part of a RowId and use scanner’s StartRow and StopRow property to find it. You can read more about hbase filters on Cloudera page.

static string serverHostName = "your_server_address_here";
static int port = 9090;

public static void FilterExample()
{
    using (var socket = new TSocket(serverHostName, port))
    using (var transport = new TBufferedTransport(socket))
    using (var protocol = new TBinaryProtocol(transport))
    using (var client = new Hbase.Client(protocol))
    {
        TScan tscan = new TScan();
        tscan.FilterString = Helpers.CreateSingleColumnValueFilter("col_family", "column_name", "=", "filter_value").GetUTF8Bytes();

        byte[] tableName = "tutorial:TestTable".GetUTF8Bytes();

        int scannerId = client.scannerOpenWithScan(tableName, tscan, new Dictionary<byte[], byte[]>());

        int scannerBatchSize = 100;
        List<TRowResult> result = null;

        do
        {
            result = client.scannerGetList(scannerId, scannerBatchSize);

            //Do something with result
        }
        while (result.Count == scannerBatchSize);
    }
}
class Helpers
{
    public static string CreateSingleColumnValueFilter(string family, string column, string oper, string value)
    {
        return string.Format("(SingleColumnValueFilter ('{0}', '{1}', {2}, 'binary:{3}', true, true))", family, column, oper, value);
    }
}

You can speed up filtering operations by dividing tables into more regions. The more regions table has the more threads HBase can use to process it.


Multithreading

I found Thrift thread safe as long as every thread has its own socket, transport, protocol and client :-)

You have to remember to dispose unused clients. Otherwise your requests will get stuck on Thrift side because of insufficient number of thrift processing threads! In such case your Scans and Gets will hang forever. You will not receive any timeout exception from thrift!

You can set minThreads and maxThreads for thrift on the server but from my experience thrift spawns minThreads at the start and never spawns new ones. It's best to set minThreads on the Thrift side to the maximum threads number of your application.


Long running scanners

When retrieving tons of data or having lot of threads connecting to HBase you can find scans to be unstable. It happens to me that HBase, under heavy load, throws scannerOutOfOrder exception. You also have to keep in mind that, when using default settings, you only have 60 seconds to ask for next scanner's batch after retrieving previous one. After this time scanner is dead.

To deal with all of this you can develop a proxy object for scanner to which you can pass last successfully retrieved row_id (scanner returns data ordered by row_id). If your scanner dies for any reason you create new one passing last row_id to TScan’s StartRow property. Thanks to this you can continue your scanner from the place where it died. For some time I had been using implementation similar to this:

class HBaseScannerProxy
{
    [ThreadStatic]
    protected static int? scannerId = null;

    [ThreadStatic]
    protected static Hbase.Client hbClient = null;
    [ThreadStatic]
    protected static TSocket socket = null;
    [ThreadStatic]
    protected static TBufferedTransport transport = null;
    [ThreadStatic]
    protected static TBinaryProtocol protocol = null;
    
    protected bool ConnectToHBase()
    {
        string serverHostName = "get_host_from_your_config";
        int serverPort = 9090;

        try
        {
            socket = new TSocket(serverHostName, serverPort);
            transport = new TBufferedTransport(socket);
            protocol = new TBinaryProtocol(transport);
            hbClient = new Hbase.Client(protocol);

            hbClient.InputProtocol.Transport.Open();

            return true;
        }
        catch (Exception ex)
        {
            //Log exception

            hbClient = null;

            return false;
        }
    }

    public void CloseClient()
    {
        try
        {
            if (hbClient != null)
                hbClient.Dispose();
        }
        catch (Exception ex)
        {
            //Log exception
        }
        finally
        {
            hbClient = null;
        }

        try
        {
            if (protocol != null)
                protocol.Dispose();
        }
        catch (Exception ex)
        {
            //Log exception
        }

        try
        {
            if (transport != null)
            {
                if (transport.IsOpen)
                    transport.Close();

                transport.Dispose();
            }
        }
        catch (Exception ex)
        {
            //Log exception
        }

        try
        {
            if (socket != null)
            {
                if (socket.IsOpen)
                    socket.Close();

                socket.Dispose();
            }
        }
        catch (Exception ex)
        {
            //Log exception
        }
    }

    public void CloseScanner()
    {
        try
        {
            if (hbClient == null)
                return;

            if (!scannerId.HasValue)
                return;

            hbClient.scannerClose(scannerId.Value);
        }
        catch (Exception ex)
        {
            //Log exception
        }
        finally
        {
            scannerId = null;
        }
    }
    
    protected bool PrepareScanner(byte[] startRow, byte[] tableName)
    {
        if (hbClient == null && !ConnectToHBase())
            return false;

        try
        {
            //Prepare scanner
            TScan tscan = new TScan();

            if (startRow != null && startRow.Length > 0)
                tscan.StartRow = startRow;

            scannerId = hbClient.scannerOpenWithScan(tableName, tscan, new Dictionary<byte[], byte[]>());

            return true;
        }
        catch (Exception ex)
        {
            //Log exception

            scannerId = null;

            return false;
        }
    }
    
    public List<TRowResult> GetScannerBatch(int batchSize, byte[] lastSuccessfullyRetrievedRow, byte[] tableName)
    {
        try
        {
            //Prepare connection and scanner
            if (hbClient == null && !ConnectToHBase())
                throw new Exception("There isn't any active hbClient");

            if (!scannerId.HasValue && !PrepareScanner(lastSuccessfullyRetrievedRow, tableName))
                throw new Exception("There isn't any active scanner");

            //Get results
            List<TRowResult> results = hbClient.scannerGetList(scannerId.Value, batchSize);

            //Return results
            return results;
        }
        catch (Exception ex)
        {
            //Log exception

            //Try to close current connection
            CloseScanner();
            CloseClient();

            //Return null to make program do next loop iteration
            return null;
        }
    }
}

.NET C# Sample Code for HBase Thrift Download

I have put all the above examples into one project. You can download it here: .NET C# HBase Thrift Examples. Remember to change the thrift host_name and port in Examples.cs file.