HBase and Hive with Avro Column size limits

Yesterday, we had the privilege of having to create Hive tables on top of a HBase table with avro columns. Piece of cake, we thought, since you just have to specify the schema location in Hive (since Hive 1.0, it’s actually possible to map avro schema in HBase columns to Hive columns), map it to the column in HBase and voila, it works.

Except that it just doesn’t, at least if you have a large schema definition. In our specific case, we have a XSD Schema at the root – and like all things XML, it’s quite substantial. Converted to something decent like Avro, it is still quite big. That’s when we discovered that Hive has a limit for its Serializer of 4000 chars. That limit makes sense when you consider that Hive stores its metadata in a SQL database. So after googling a bit, we found this post:

http://gbif.blogspot.de/2014/03/lots-of-columns-with-hive-and-hbase.html

which basically pointed to the SERDE_PARAMS table. And unsurprisingly, the limit there is VARCHAR(4000).

Following that advice, we updated the schema of the database using this:

alter table SERDE_PARAMS MODIFY PARAM_VALUE TEXT;

That as such isn’t enough though, because Hive was still consistently failing. Since finding out the second cause was a slightly painful experience involving remote debugging Hive Server (thanks again, Roland), without further ado, here’s the second update command:

alter table SD_PARAMS MODIFY PARAM_VALUE TEXT;

That’s also necessary, because after deserializing, Hive stores the extracted column in StorageDefinition (hence the SD in the table name), and that has the same limit.

 

Posted in Uncategorized | Leave a comment

Scalding merge, concatenation and joins of pipes

I recently built a scalding job that ran everyday collection a set of ids with timestamps to determine the newest and oldest occurrence of a set, whilst merging that with previously aggregated set. A very simple task, involving simple mapping and reducing functions with joins.

Well the latter wasn’t as straightforward as I though. If you have two pipes to join you have the choice between the following:

- joinWithLarger
- joinWithSmaller
- joinWithTiny

which is decent, as this gives you the ability to optimise the sets and mapping behaviours.You can also specify the type of join operations with the Joiner, of which there are four:

- OuterJoiner
- LeftJoin
- RightJoin
- InnerJoin

All of which do basically what joins should. In my specific case though, as I was joining a tuple of three fields:

incomingPipe.joinWithLarger(('profileId, 'foreignId, 'systemId) ->('currentProfileId, 'currentForeignId, 'currentSystemId), createCurrentPipe(), joiner = new OuterJoin)

I encountered logical but still surprising results. If you have a disjunctive set of ids, those results will be nullified as they cannot be joined. If you print out the whole output with a filter for instance like this:

filter('profileId, 'foreignId, 'systemId) {
fields: (String, String, String) => {
println(s"pid : ${fields._1} systemId ${fields._3}")
true
}

you’ll see a couple of Nulls in your output:

pid : 1337984600323010250219044000012966 systemId 82202
pid : Null systemId Null
pid : 1338141060494010250219044000006465 systemId 82202
pid : 1338151102885010250219044000007688 systemId 92078

To solve this particular problem, it makes more sense to concatenate / unionise the pipes using the ++ operator in RichPipe:

createCurrentPipe() ++ incomingPipe

One word of caution here: if you do that, make sure you bind your concatenated pipes to a val. Otherwise you’ll get the slightly frustrating

cascading.flow.planner.PlannerException: not all source taps bound to head pipes, remaining source tap names

error. If you code looks like this:

val joinedPipes = createCurrentPipe() ++ incomingPipe
.groupBy('profileId, 'foreignId, 'systemId)

change it to

val joinedPipes = createCurrentPipe() ++ incomingPipe
joinedPipes.groupBy('profileId, 'foreignId, 'systemId)

binding it to the val first will help cascading properly construct the flow and bind the sources taps.

Posted in Scala, Uncategorized | Tagged , , , | Leave a comment

ClassNotFoundException org.apache.hadoop.io.SequenceFile in Flume with HDFS Sink

If you ever encounter this error while setting up Flume with HDFS Sink:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)

Just add the following JAR to your classpath in flume-env.sh:

HADOOP_HOME=your path 

FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/hdfs//hadoop-hdfs-2.5.0-cdh5.3.1.jar"
Posted in Uncategorized | 1 Comment

Writing Avro records to HBase columns

I’ve been digging around recently to see how to store avro records in HBase without “exploding” values to single columns; this being a viable alternative since Hive 0.14 with it’s support for Avro queries in HBase columns.

What you basically need to do is to serialize your avro record to a byte array and put that in your column:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter writer = new SpecificDatumWriter(SpecifcRecord.SCHEMA$);
DataFileWriter dfw = new DataFileWriter(writer);
PutRequest req = null;
try {
    dfw.create(SpecifcRecord.SCHEMA$, baos);
    dfw.append(currentEvent);
    dfw.close();
    req = new PutRequest(table, currentRowKey, colFam,
        "record".getBytes(), baos.toByteArray());
} catch (IOException e) {
    e.printStackTrace();
}

That’s basically it.

Posted in Uncategorized | Leave a comment

Counters using Cascading Flow Listeners in Scalding

As of now, Scalding doesn’t provide full support for counters – you will find a few pull requests and the Stats class, nothing more. This will probably change in the future, until then, I found using Cascading FlowListeners for counters was the most convenient solution.

In my Scalding Job class, I just need to override the listeners and add my own implementation of a FlowListener:


class MyJob (args: Args) extends Job(args) {

  // my stats group, this helps keep them apart from others
  val statsGroupName = "MyStatsGroup"
  // my metric
  val deletedUsersStat = Stat("deletedUsers", statsGroupName)


  override def listeners = super.listeners ++ List(new FlowListener {

    override def onStarting(flow: Flow[_]) {
       // init stuff goes in here
    }

    override def onCompleted(flow: Flow[_]) {
      try {
        val fs = flow.getFlowStats
        fs.getCounterGroups.foreach { group =>
          fs.getCountersFor(group).foreach { counter =>
            println(group + "::" + counter + ":" + fs.getCounterValue(group, counter))
          }
        }
        val myStat = (fs.getCounterValue(deletedUsersStat.group, deletedUsersStat.name))
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }

    override def onStopping(flow: Flow[_]) {

    }

    override def onThrowable(flow: Flow[_], e: Throwable): Boolean = {
      e.printStackTrace()
      // return true after handling, otherwise your listener will stop
      true
    }
  })

You will have to use the reference to your Stat object to update and increment your metric:


  val myPipe.map {
     //... other map code
     deletedUsersStat.incBy(1)
  }

deletedUsersStat being the stat val declared at the top of MyJob in the code block with the listener declaration

Posted in Uncategorized | Tagged , | Leave a comment

If you’re wondering why your iPhone’s battery is empty again

Look no further, ditch Facebook.

In terms of battery usage, it’s the worst hog ever:

IMG_5742-2

Data usage is quite massive too:

IMG_5743

Once you terminate the app and switch to using safari for facebook, you’ll notice the difference.

Posted in Uncategorized | Leave a comment

How to debug a hadoop Job with eclipse (or any other IDE)

Before we get started – just a quick note: this will only work for as long as your hob haven’t been submitted to a cluster or as long as your jobs run locally.

This is basically just the right thing to do if you want to debug configuration parameters or other set up relevant processes. I used this to debug a CLI call to a scalding job, for instance.

1) The first thing you need to do is to add the remote debugging facility to hadoop:


export HADOOP_OPTS="$HADOOP_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8999"

This need to be added to your conf/hadoop-env.sh or exported to your env variables.

2) Now eclipse:

Choose Run -> DebugConfigurations -> Remote Java Application

and add port 8999 to your connection settings. That’s what it should look like:

Screen Shot 2014-09-30 at 17.34.44

Posted in Uncategorized | Leave a comment

Mixing Scala and Java in a Gradle project

This post is basically the twin of an earlier post, which describes the same process for maven.

I had the questionable pleasure of having to convert my existing maven project to Gradle, which is basically almost as bad a maven and a lot slower, but hell, which build tool is perfect anyway?

So with not much further ado, here’s the basic structure:

apply plugin: 'scala'
apply plugin: 'eclipse'
sourceCompatibility = 1.7
version = '1.0'
configurations {
provided
}
configurations.all {
resolutionStrategy {
            force(
            'org.scala-lang:scala-library:2.10.4'
           )
    }
}
jar {
    from(configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }) {
        exclude "META-INF/*.SF"
        exclude "META-INF/*.DSA"
        exclude "META-INF/*.RSA"
        exclude "META-INF/license/*"
    }
    manifest {
    attributes( 'Implementation-Version': version,
                'Built-By': System.getProperty('user.name'),
                'Built-Date': new Date(),
                'Built-JDK': System.getProperty('java.version'))
    }
}
repositories {
    mavenCentral()
    maven { url "http://conjars.org/repo/" }
    maven { url "http://repo.typesafe.com/typesafe/releases/" }
}
dependencies {
    compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
    compile "org.apache.flume.flume-ng-sinks:flume-ng-elasticsearch-sink:1.5.0.1"
    testCompile group: 'junit', name: 'junit', version: '4.+'
    
}
test {
    systemProperties 'property': 'value'
}

 

I must confess that this is actually a great deal shorter than the maven equivalent, mainly due to the simplicity of the scala plugin implementation, which also takes care of compiling java classes as well.

The project structure as such follows the basic pattern of maven projects:

src/main/scala
src/main/java
src/main/resources

and pretty much the same for test.

Posted in Uncategorized | Leave a comment

Creating an ELB load balancer with private subnet instances in a VPC

I was facing massive issues with an ELB configuration which had the following set up:

  • All instance were part of an AWS VPC
  • Three subnets, one public, two privates
  • Both private subnet contained the web containers (tomcat) in two different availability zones

The issue that I was facing was that whatever I did, the LB wasn’t routing requests to my instances. The initial configuration was such that my ELB instances were part of the same subnets as my tomcat instances. As such, using curl against those always worked, but not using the ELB’s public address.

After some frustrating googling, I came up with the solution:

1. Your ELB instances cannot be launched in a private network attached to an Internet Gateway (NAT instance).

2. Conversely, you need to set up public networks which “shadow” your private networks in the same respective availability zones. In my case, I had two private subnets 10.0.1.0 and 10.0.2.0; I created two public subnets 10.0.10.0 and 10.0.20.0 to accomodate my ELB instances.

3. You need to add routing from those new public subnets to your private original subnets, i.e. add the public route to these subnets and add the internet gateway for accessing the private networks. You can set this up in VPC -> Subnets. Here’s an example:

Screen Shot 2014-09-11 at 11.32.56

 

In your subnet view, it should look like this:

 

 

Screen Shot 2014-09-26 at 09.00.19

 

Remember that the public shadow subnets (10.0.10.x and 10.0.20.x) are connected to the public route, while the private subnets (10.0.1.x and 10.0.2.x) are attached to the nat interface.

4. You also need to adjust your security groups. The new subnets need to have explicit access to your application’s ports in your private networks.

When you’ve done all that, you can create your ELB – if you already have an ELB that doesn’t work, delete it. Amazon will not properly clean up ELB instances in private subnets and you’ll end up with more nodes than you asked for, some of them not working.

These are screenshots describing the relevant sections of the ELB creation process:

 

 

Screen Shot 2014-09-03 at 09.38.40

 

 

Screen Shot 2014-09-03 at 09.38.04

Posted in Distributed Computing | Tagged , , | 14 Comments

Mixing Scala and Java in a maven project

Most of us work in environments with a considerable amount of java real estate. In order to integrate our scala code into that setup it’s sometimes necessary to mix Java and Scala in one project on a maven set up.

Here’s a working POM for such a project with respective source folders for both languages in

src/main/java

and

src/main/scala

	<build>
		<defaultGoal>package</defaultGoal>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
			</resource>
			<resource>
				<directory>src/test/resources</directory>
				<filtering>true</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<configuration>
					<encoding>${project.build.sourceEncoding}</encoding>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>copy-resources</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.2.0</version>
				<configuration>
					<recompileMode>incremental</recompileMode>
					<args>
						<arg>-target:jvm-1.7</arg>
					</args>
					<javacArgs>
						<javacArg>-source</javacArg>
						<javacArg>1.7</javacArg>
						<javacArg>-target</javacArg>
						<javacArg>1.7</javacArg>
					</javacArgs>
				</configuration>
				<executions>
					<execution>
						<id>scala-compile</id>
						<phase>process-resources</phase>
						<goals>
							<goal>compile</goal>
						</goals>
					</execution>
					<execution>
						<id>scala-test-compile</id>
						<phase>process-test-resources</phase>
						<goals>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
				<executions>
					<execution>
						<phase>compile</phase>
						<goals>
							<goal>compile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

		</plugins>
		<pluginManagement>
			<plugins>
				<!--This plugin's configuration is used to store Eclipse m2e settings 
					only. It has no influence on the Maven build itself. -->
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>
											net.alchim31.maven
										</groupId>
										<artifactId>
											scala-maven-plugin
										</artifactId>
										<versionRange>
											[3.1.6,)
										</versionRange>
										<goals>
											<goal>compile</goal>
											<goal>testCompile</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore></ignore>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
	</build>

Please do note that maven-compiler is actually not necessary as scala-compiler actually compiles the java code as well. The different executions make sure that scala-compiler comes first to allow access from java to scala too. I also force JDK 7 onto the scala-compiler. This might not be necessary in future releases of the scala-compiler plugin.

The plugin conf section is for eclipse’s m2e plugin. It suppresses pesky warnings.

Posted in Scala | Tagged , , | 1 Comment