Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,163
Privacy Policy · Terms
filter by tags archive
time to read 14 min | 2692 words

Okay, so this is the "coding in anger" part for Rhino ETL. I need to import files into MS CRM entities. The files are standard CSV files, with the usual corruption of values that such files have. The CRM is accessed through the web services, although I am keeping aside the option of direct DB access, if I can't get the Web Services to perform any faster.

The first problem that I had was that the MS CRM Web Services are not simple services. They accept entities that are defined in the WSDL for them, not simple values. That put me in a complexity spin for a while, until I remembered that I am not working in my own little language, I am working on .NET. A quick trip to Visual Studio and an Add Web Reference + Compile later, I had integrated accessing the MS CRM into Rhino ETL.

Here is how it was done:

import CrmProxy.Crm from CrmProxy

Basically it means that I now had a dll that contains the proxy definitions for the web service, and I imported it. So it is incredibly easy to use.

Then, it was the matter of reading the file. Rhino ETL has integrated with the FileHelpers library, and I couldn't really be happier about it. There are several reasons for that, but the main one is that I run into something that the library can't handle, and I fixed that in 10 minutes, without changing the library code. Speaking of software that I like, this is one of the main criteria that I use to evaluate a piece of software. What happens when I step off the ledge? With FileHelpers, I can extend it so easily, that I really don't care about that.

Anyway, here is a part of the class definition for our file: 

[DelimitedRecord(","), IgnoreFirst]
class Customer:
      [FieldConverter(ConverterKind.Date, "dd/MM/yyyy")] 
      UpdateDate as date
      Id as int
      Name as string
      ResponsibleEmployee as Nullable of int
      [FieldConverter(Rhino.ETL.FileHelpersExtensions.DateTimeConverterWithNullValue, "dd/MM/yyyy","00/00/0000")] 
      ReceptionDate as Nullable of date

As you can see, there isn't much to it except defining the fields, types, etc.

source CustomersFile:
     execute:
            file = Read(typeof(Customer)).From(Configuration.CustomerFile)
            file.OnError(ErrorMode.SaveAndContinue)
            for customer in file:
                  print "Source ${customer.Id}"
                  SendRow( Row.FromObject(customer) ) 
            if file.HasErrors:
                  file.OutputErrors(Configuration.CustomerErrorsFile)
                  AddError("Errors have been written to ${Configuration.CustomerErrorsFile}")

Here I read from the file, use the Row.FromObject() to translate an entity into a row, and then send it forward. One amazing thing here is that FileHelpers will generate an errors file for me on demand. And that one is clear and concise and actually useful. Comparing to the amount of effort that I know are required to pull reasonable errors from SSIS file input, that is a great pleasure.

Anyway, if you missed that, I am very happy about FileHelpers.

Another thing to point out is the Configuration.CustomerFile, etc. The Configuration object is dynamically populated from a config file that you can pass to Rhino ETL (command line arg), which is a simple xml file in the format:

<configuration>
	<CustomerErrorsFile>D:\customers_errors.txt</CustomerErrorsFile>
</configuration>

Why XML? Because this seems like a place where I would want to touch with stuff like xmlpoke, etc. So it is easier to work with. It is also a flat configuration scheme, that doesn't have any semantics other than the simple key/value pair.

So, now that I have the data, I can send it to the destination:

destination Crm:
      initialize:
            Parameters.Srv = CrmService(
                  Url: Configuration.Url,
                  Credentials: NetworkCredential(
                            Configuration.Username,
                           
Configuration.Password,
                           
Configuration.Domain),
                  CallerIdValue: CallerId(CallerGuid: Guid(Configuration.CallerId)),
                  UnsafeAuthenticatedConnectionSharing: true,
                  PreAuthenticate: true
                  )

      onRow:
            theAccount = account(
                  accountnumber: Row.Id.ToString(),
                  name: Row.Name,
                  telephone1: Row.Phone,
                  telephone2: Row.Cellular,
                  telephone3: Row.AdditionalPhone,
                  fax: Row.Fax,
                  accountreceptiondate: CrmDateTime(Value: Row.ReceptionDate.ToString("yyyy-MM-ddT00:00:00")),
                  address1_city: Row.City,
                  )
            result = Parameters.Srv.Create(theAccount)
            print "Created account ${Row.Id} -> ${result}"

      cleanUp:
            Parameters.Srv.Dispose()

As you can see, we have the initialize method, which creates the service, then we instansiate an account instance, fill it with the required parameters, and go to town. It is also notable the easy translation of types from CLR types to CRM types, such as in the case of accountreceptiondate.

All in all, the only difficulities that I had during this were to make heads or tails from the inforamtion in the file, which is where I want the difficulity to lie when I am dealing with ETL processes.

time to read 1 min | 152 words

Just finished writing the tests for reading and writing files. You can check the script below. With the exception of making the connection syntax consistent with the rest of it, I am going to consider this feature complete, the next things to is to work on deployment (basically, a tool that allows to run the script :-) ).

[DelimitedRecord("\t")]
class Customers:
    public OrderID as int
    public CustomerID as string
    public EmployeeID as int

connection(
    "Database",
    ConnectionType: SqlConnection,
    ConnectionString: "Data Source=localhost;Initial Catalog=ETL_Test; Integrated Security=SSPI;"
    )
source OrdersFromDatabase, Connection="Database":
    Command: "SELECT OrderID, CustomerID, EmployeeID FROM Orders"

destination OrdersFile:
    initialize:
        Parameters.File = Write(Customers).To("output.txt")
    onRow:
        cust = Customers(
            OrderID: Row.OrderID,
            CustomerID: Row.CustomerID,
            EmployeeID: Row.EmployeeID
            )
        Parameters.File.Write(cust)
    cleanUp:
        Parameters.File.Dispose()

pipeline OutputOrders:
    OrdersFromDatabase >> OrdersFile
target default:
    Execute("OutputOrders")

time to read 2 min | 327 words

Well, after some thinking, I figured out that I actually had only two types of sources, database and other. Since other is going to always be code, I decided to start with web services source, since that is arguably the easiest (nothing much to do there). It turned out to be more complicated than I assumed, mainly because the .Net 2.0 web service stack has no easy way to do duck typing of web services. It requires compiled web services. I got around that by doing runtime compilation, but still,that is hardly elegant.

Anyway, what I have now is this:

source WebServiceGenerator:
	execute:
		empSrv= WebService(WsdlUrl: "http://localhost:9090/GetEmployees.asmx?wsdl" )
		results = empSrv.GetEmployees("Northwind")
		for result in results:
			SendRow( Row(
				Id: result.Id,
				Name: result.Name
				))

As you can see, the only thing that I really need to do is to specify the WSDL url for the web services, and everything from there is fairly natural. The execute block is used to distinguish between database sources (which has command, parameter, etc) and the "other" sources, such as the one above.

Note: Due to the way Rhino ETL works, the order of the sent rows and the order of their processing may differ. This means that if the web service send you Emp #1, Emp #2, Emp #3, they may be processed in Emp #1, Emp #3, Emp #2. (Actually, the issue would tend to come up with larger amount of rows, since the problem is different processing of the batches.

Next step, supporting Web Service output, which may require some complexity if the web service expect complex types (and since I know that I need to handle those, I have to support that with dynamic compilation, that is going to make my life interesting :-)

After that, I intend to start integrating File Helpers as a source / destination for files. I will post separately on this, but so far I am impressed with both the ease of the API and the quality of the documentation.

time to read 1 min | 120 words

Well, that turned out to be really easy, thanks to Tomas Restrepo, who pointed me directly to the right place.

You can now write:

target withTransaction:
	transaction:
		cookie = Execute("CopyUsers")
		Execute("WillThrow").After(cookie)
		
target transactionWithIsolationLevel:
	transaction(IsolationLevel.Serializable):
		cookie = Execute("CopyUsers")
		Execute("WillThrow").After(cookie)

As far as I am concerned, the way it works is magic, because the whole thing is still heavily multi threaded and I didn't even thought about checking whatever I have MSDTC installed ( I am on Windows 2003, so apparently yes, because it works ).

This mostly complete the baseline features that I consider mandatory, which means that I now can focus on the other ends, meaning focusing on adding more sources and destinations.

Rhino ETLTargets

time to read 1 min | 155 words

Well, that is two items down my list already, I have added support for targets to Rhino ETL. A target is similar in concept to a target in NAnt, it specify what needs to be run when the package run. This allows to specify how we want to run the various actions that we have.

Here is a simple example:

target default: 
Execute("CopyOrders") Execute("MoveCustomers")

As you can see, it just lists the pipelines that we want to run. By default, the target execute all the registered pipelines (or other actions) in parallel. But what happens when you want to run them in a sequence?

target default:
	sequence:
		Execute("CopyOrders")
		Execute("MoveCustomers")

Another option is that you have a dependency between two pipelines, but you don't care about the rest, you can do this as well, like this:

target withDependencies:
	copyOrders = Execute("CopyOrders")
	Execute("MoveCustomers")
	Execute("AfterCopyOrders").After(copyOrders)

Next task, transactions...

time to read 1 min | 70 words

SoWell, that turned out to be really simple. Check out a simple RowCount:

transform CountRows:
	Context.Items.RowCount = 0 unless Context.Items.RowCount
	Context.Items.RowCount+=1
	RemoveRow()
	OnComplete:
		SendRow( Row(RowCount: Context.Items.RowCount) )
 

And then we have a more complex one, summing two columns:

transform CalcSumOfSalaryAndId:
	unless Context.Items.IdSum:
		Context.Items.IdSum = 0 
		Context.Items.SalarySum = 0
		
	Context.Items.IdSum+=Row.Id
	Context.Items.SalarySum+=Row.Salary
	RemoveRow()
	
	OnComplete:
		SendRow( Row(
			IdSum: Context.Items.IdSum, 
			SalarySum: Context.Items.SalarySum
			) )

 

 So, basically we have an initialization section, processing, and when all the processing is done, you can send new rows downward in the pipeline.

time to read 1 min | 174 words

Well, I think that I have a solid foundation with the engine and syntax right now, I still have error conditions to verify, but that is something that I can handle as I go along. Now it is time to consider handling joins and merges. My initial thinking was something like:

joinTransform UsersAndOrganizations:
	on: 
		Left.Id.ToString().Equals(Right.UserId)
	transform:
		Row.Copy(Left)
		Row.OrgId = Right["Organization Id"]

The problem is that while this gives me equality operation, I can't handle sets very well, I have to compare each row vs. each row, and I would like to do it better. It would also mean having to do everything in memory, and I am not really crazy about that (nor particularly worried, will solved that when I need it).

Another option is:

joinTransform UsersAndOrganizations:
	left:  [Row.Id, Row.UserName]
	right: [Row.UserId, Row.FullName]
	transform:
		Row.Copy(Left)
		Row.OrgId = Right["Organization Id"]

This lets me handle it in a better way, since I now have two sets of keys, and I can do comparisons a lot more easily.That is a lot harder to read, though.

Any suggestions?

Both on the syntax and implementation strategies...
time to read 2 min | 336 words

First, let me make it clear, it is not ready yet.

What we have:

  • 99% complete on the syntax
  • Overall architecture should be stable
  • The engine works - but I think of it as a spike, it is likely to change significantly.

What remains to be done:

  • Parallelising the work inside a pipeline
  • Better error messages
  • More logging
  • More tests
  • Transforms over sets of rows

Here are a few works about how it works. The DSL is compromised of connection, source, destination and transform, which has one to one mapping with the respective Connection, DataSource, DataDestination and Transform class. In some cases, we just fill the data in (Connection), in some cases we pass a generator (think of it as a delegate) to the instance that we create (DataSource, DataDestination), and sometimes we subclass the class to add the new behavior (transform).

A pipeline is a central concept, and is compromised of a set of pipeline associations, which connect the input/output of components.

Places to start looking at:

  • EtlContextBuilder - Compile the DSL and spits out an instance of:
  • EtlConfigurationContext - the result of the DSL, which can be run using:
  • ExecutionPackage - the result of building the EtlConfigurationContext, this one manages the running of all the pipelines.

There is an extensive set of tests (mostly for the syntax), and a couple of integration tests. As I said, anything that happens as a result of a call to ExecutionPackage.Execute() is suspect and will likely change. I may have been somewhat delegate happy in the execution, it is anonymous delegate that calls anonymous delegate, etc, which is probably too complex for what we need here.

I am putting the source out for review, while it can probably handle most simple things, it very bare bone and subject to change.

You can get it here: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/Rhino-ETL

But it needs references from the root, so it would be easiest to just do:

svn checkout https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/Rhino.ETL

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - one day from now
  2. Configuration values & Escape hatches - 4 days from now
  3. What happens when a sparse file allocation fails? - 6 days from now
  4. NTFS has an emergency stash of disk space - 8 days from now
  5. Challenge: Giving file system developer ulcer - 11 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}