Stochastic Nonsense

Put something smart here.

Don't Use Hive in Production

tl;dr: friends don’t let friends use hive

While hive can be nice for analytics, trying to productionize hive is awful in general and a nightmare on amazon. You can either read what I’ve learned the hard way, or simply save yourself and don’t do it. Here is just a small list of issues you will run into. From very painful experience, the combination of hive, oozie, and amazon means that just because a job executed yesterday, there is at best probability 0.95 that it will work today.

hive problems

hive best practices

  • don’t use hive
  • don’t use amazon / emr
  • set every set variable you use so you can see what hive thinks the values are, eg
1
2
set s3_bucket = ${S3_BUCKET};
set s3_bucket;
  • use a partition such as batch_id
  • turn compression on, parallel execution on — see recommended settings
  • in amazon, use external tables
  • in amazon, put everything in the same region
  • set reducers to 64 or 128, eg set mapred.reduce.tasks=128;
  • location prefix location 's3n://${s3_bucket}/subdir/tablename'; to aid separation of dev, qa, and prod
  • in amazon, set dummy variables to you can see where you are
  • don’t have lots of input files; hive scales very very poorly
  • use more ram eg HADOOP_HEAPSIZE=16g when using alter table recover partitions
  • use mapjoin when applicable; hive is bad at guessing when to do so /*+ MAPJOIN(AMID_FILT) */

hive has no resume capability

hive has no job resume capability. Almost any interesting pipeline will have a long series of sql statements. For example, a workflow I wrote involved 2k lines of hive and nearly 50 individual sql statements. If processing dies in the middle, either because of a bug or because of hadoop or hive or amazon breaking, you can’t ask it to pick back up where it started. It’s a best practice to partition every table with some identifier that associates that table with a data run. We use a partition / pseudocolumn named batch_id. That gets you so close but so far. What you then really want is some ability to say that if a fixed batch_id on a given table already exists, don’t rerun the sql. In practice, what you do is either:

  • split hive into tiny pieces and use oozie, which fractures your job logic all over the place and makes it hell to to perform ongoing development; or
  • hand edit the sql file and comment out the parts that have already run; or
  • use ruby or python to run hive and, for each statement, check to see if a given batch_id partition already exists on a given table, and conditionally run the sql. This is complicated and fragile.

Bluntly, all options suck, and Being able to restart runs is an incredibly important component of any hadoop workflow system.

mistyping a partition name deletes all your data

if you mistype a partition name in an alter table $TABLE drop partition statement, hive will delete every partition. hive doesn’t check that the partition you type is actually a partition of the table.

complicated queries in a union all can produce erroneous results

I can’t embed the actual sql here because it has a bunch of internal logic in it, but essentially if the first or second query is complicated we found two instances in our code where rows where being dropped. There are two workarounds: create temp tables and move the where statements to them, or create a partitioned table and similarly move the where logic elsewhere.

1
2
3
4
5
6
7
8
9
10
11
12
-- create tmp_table0 and tmp_table1 with logic in the where clauses, then:

select column0, column1
from (
   select column0, column1
   from tmp_table0

   union all

   select column0, column1
   from tmp_table2
) Q;

or

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
create table if not exists blah (
   column0 string,
   column1 string
)
partitioned by (chunk string)
location 's3n://...';

insert overwrite table blah partition (chunk='table0')
select column0, column1
from tmp_table0
where -- safe to put complex logic here;

insert overwrite table blah partition (chunk='table1')
select column0, column1
from tmp_table1
where -- safe to put complex logic here;

select column0, column1
from blah;

hive often generates painfully bad hadoop

eg count distinct will often attempt to push all data through a single reducer. For example, a very common task is to ask how many unique A are there by B, or to be concrete, how many unique identifiers are there grouped by attribute:

1
2
3
select attribute, count(distinct(identifier)) as num_uniq_identifiers
from data_store
group by attribute;

will attempt to push terabytes of data through a single reducer; manually setting the number of reducers doesn’t seem to help. To fix this, rewrite the sql as so (though this will then execute two hadoop jobs):

1
2
3
4
5
6
7
select attribute, count(*) as num_uniq_identifiers
from (
   select identifier, attribute
   from data_store
   group by identifier, attribute
) Q
group by attribute;

Conditions on joins are sometimes silently ignored

percent counters on mappers don’t work

Often when reading compressed files or from s3, counters snap from 0 to 100%

hive can write files it can’t read

If hive writes a file compressed by snappy with 0 length, it will happily write the file then die when attempting to read it with exception TODO. A common place you would encounter this is a query that is essentially an assert — that is, under normal operation, it will return no results. Thus our pipeline has queries ringed with compression statements:

1
2
3
4
5
6
-- make sure files do not have snappy compression
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

select ...; -- query that will hopefully return no results unless there is a bug

set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

hive can’t deserialize file names it wrote

When partitioning tables, the partition strings become part of the directory structure. Therefore, hive appears to encode (possibly url encode?) the partition values. I’m not exactly sure what the bug is, but if you put a dash (-) in your partition value, if you issue an alter table $TABLE recover partitions command, hive appears to double de-encode the filename and will be unable to add the partition to its metastore.

parameter substitution is fragile and busted

sed implementation — the set syntax You can specify parameters as so: set batchid=${BATCH_ID}; and then refer to them in your code as:

1
2
3
select field1, field2
from tablename
where batch_id = '${hiveconf:batchid}';

There are a multitude of problems with this. First, this is how you handle arguments if oozie is running hive. If you want to pull arguments from the command line, you have to actually change the statement to be set batchid=${env:BATCH_ID}; and you can then run your code as BATCH_ID=20130501 hive -f my_hive_code.sql. also, because hive sucks, inside the squiggle brackets is space sensitive. Do NOT, for readability write set batchid = ${ env:BATCH_ID };

applying sql operators to partitions crashes

Calling split on a partition, as opposed to a column, appears to crash hive with an obtuse error message. We had a partition named partner_id which was really two columns joined together with a dash. I wanted to use split to separate the two columns and apply some filtering logic, but this appeared not to be possible. This can be worked around, but what a pain.

1
2
3
4
5
select concat( id, '-', partner_id ) as id, ip_address, sum( num_sessions ) as num_sessions
from tfidf_info
where split( partner_id, '-' )[ 0 ] = 'directmatch'
group by concat( id, '-', partner_id ), ip_address;
-- this generates the below exception

but this works

1
2
3
4
select concat( id, '-', partner_id ) as id, ip_address, sum( num_sessions ) as num_sessions
from tfidf_info
where partner_id in ( 'directmatch-0', 'directmatch-1', 'directmatch-3' )
group by concat( id, '-', partner_id ), ip_address;

this is the exception, for googlers

FAILED: Hive Internal Error: java.lang.ClassCastException(org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector)
java.lang.ClassCastException: org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector 
at org.apache.hadoop.hive.ql.optimizer.ppr.PartExprEvalUtils.evalExprWithPart(PartExprEvalUtils.java:77) 
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.evalExprWithPart(PcrExprProcFactory.java:73) 
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory$GenericFuncExprProcessor.process(PcrExprProcFactory.java:328) 
at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102) 
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.walkExprTree(PcrExprProcFactory.java:450) 
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrOpProcFactory$FilterPCR.process(PcrOpProcFactory.java:149) 
at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125) 
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102) 
at org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover.transform(PartitionConditionRemover.java:78) 
at org.apache.hadoop.hive.ql.optimizer.Optimizer.optimize(Optimizer.java:87) 
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7339) 
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243) 
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:430) 
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:337) 
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:889) 
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:261) 
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:218) 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:409) 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:344) 
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:442) 
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:457) 
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:655) 
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:567) 
at org.apache.oozie.action.hadoop.HiveMain.runHive(HiveMain.java:303) 
at org.apache.oozie.action.hadoop.HiveMain.run(HiveMain.java:280) 
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:37) 
at org.apache.oozie.action.hadoop.HiveMain.main(HiveMain.java:55) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
at java.lang.reflect.Method.invoke(Method.java:597) 
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:478) 
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:396) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132) 
at org.apache.hadoop.mapred.Child.main(Child.java:249)

Intercepting System.exit(12)

hive tweaked compression algorithms

snappy and lzo are nearly impossible to decompress from the commandline or from outside the hadoop cluster, making debugging far harder than it ought to be. Sometimes there’s just no substitute for looking at the raw contents of a file. You can use gzip and zcat, but only if you use gzip compression, eg:

1
hadoop fs -cat 's3n://s3_bucket_here/my_file_here/000000.gz' | zcat | head -30

recommended settings

These are not the defaults, at least on amazon.

set hive.exec.compress.output=true;
set hive.exec.compress.intermediate=true;
set mapred.compress.map.output=true;
set hive.exec.parallel=true;

set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec; -- if you wish to use bz2





todo:

  • use external tables. But dropping a partition is bad bad; you have to make sure the underlying storage goes away.
  • no support for dags, so oozie, but own problems with that
  • best practice: all tables external?
  • no join counters
  • poor counter support
  • poor ability to communicate; set dummy variables
  • explain / analyze is often useless
  • using set hive.cli.print.header=true; to see headers will cause hive to hang when writing a table
  • join syntax: specify rhs condition in the syntax
  • BUG: snappy dies on 0 data files
  • CRUCIAL: no way to specify LOJ is 0 or 1;
  • strange null handling
  • best practice: always group by if supposed to be unique
  • BUG: making table external must be in caps
  • OOM indicator: java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271) Caused by: java.io.IOException: Task process exit with nonzero status of 137. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)
  • logs disappear
  • oozie hard to capture workflow logs
  • trick: set dummy variables to point out where you are
  • best practice: ssh key to ssh to all the remote boxes to login and look at the logs
  • problem: scales very poorly in number if input files
  • problem: hangs on launch while doing tons of s3 reads
  • problem: dynamic partitions dont tell you what rows went where
  • problem: inconsistent text formatting with rows loaded / which partition so hard to extract (annoying, but means nobody is a grownup)
  • emr/ amazon: no easy way to terminate cluster from cluster
  • hive problem: doesn’t make sure syntax is consistent
  • hive SUPER ANNOYANCE: restarts count each query, so hard to find bug in file
  • tip: use -—| for code you ran and don’t want to rerun, so you can “%s/^-—|//” to fix
  • distribution / histogram code cant accept numbers as parameters
  • no for loop; cut paste code
  • example: python to write arrays
  • example: python mapper, python reducer, python both
  • set hive.optimize.skewjoin=true; has never done anything;
  • always run as hive -f mycode.sql 2>&1 | tee -a log.run.00
  • can’t include files in hive; need the ddl to be shared
  • have to rely on lexical sort of dates in batch_id to make bad touch
  • ctas fails, after it did all the work