Don't Use Hive in Production
tl;dr: friends don’t let friends use hive
While hive can be nice for analytics, trying to productionize hive is awful in general and a nightmare on amazon. You can either read what I’ve learned the hard way, or simply save yourself and don’t do it. Here is just a small list of issues you will run into. From very painful experience, the combination of hive, oozie, and amazon means that just because a job executed yesterday, there is at best probability 0.95 that it will work today.
hive problems
- hive has no resume capability
- hive is buggy
- my favorite data loss bug: mistyping a partition name deletes all your data
- complicated queries in a
union all
will produce erroneous results - hive often generates painfully bad hadoop. eg
count distinct
will often attempt to push all data through a single reducer - Conditions on joins are sometimes silently ignored
- percent counters on mappers don’t work. Often when reading compressed files or from s3, counters snap from 0 to 100%
- hive can write files it can’t read
- hive can’t deserialize file names it wrote
- parameter substitution is done by a fragile busted sed implementation — the set syntax
- applying sql operators to partitions crashes
- hive tweaked compression algorithms so that snappy and lzo are nearly impossible to decompress from the commandline or from outside the hadoop cluster, making debugging far harder than it ought to be
- recommended settings
hive best practices
- don’t use hive
- don’t use amazon / emr
set
every set variable you use so you can see what hive thinks the values are, eg
1 2 |
|
- use a partition such as
batch_id
- turn compression on, parallel execution on — see recommended settings
- in amazon, use external tables
- in amazon, put everything in the same region
- set reducers to 64 or 128, eg
set mapred.reduce.tasks=128;
- location prefix
location 's3n://${s3_bucket}/subdir/tablename';
to aid separation of dev, qa, and prod - in amazon, set dummy variables to you can see where you are
- don’t have lots of input files; hive scales very very poorly
- use more ram eg
HADOOP_HEAPSIZE=16g
when usingalter table recover partitions
- use mapjoin when applicable; hive is bad at guessing when to do so
/*+ MAPJOIN(AMID_FILT) */
hive has no resume capability
hive has no job resume capability. Almost any interesting pipeline will have a long series of sql statements. For example, a workflow I wrote involved 2k lines of hive and nearly 50 individual sql statements. If processing dies in the middle, either because of a bug or because of hadoop or hive or amazon breaking, you can’t ask it to pick back up where it started. It’s a best practice to partition every table with some identifier that associates that table with a data run. We use a partition / pseudocolumn named batch_id
. That gets you so close but so far. What you then really want is some ability to say that if a fixed batch_id
on a given table already exists, don’t rerun the sql. In practice, what you do is either:
- split hive into tiny pieces and use oozie, which fractures your job logic all over the place and makes it hell to to perform ongoing development; or
- hand edit the sql file and comment out the parts that have already run; or
- use ruby or python to run hive and, for each statement, check to see if a given
batch_id
partition already exists on a given table, and conditionally run the sql. This is complicated and fragile.
Bluntly, all options suck, and Being able to restart runs is an incredibly important component of any hadoop workflow system.
mistyping a partition name deletes all your data
if you mistype a partition name in an alter table $TABLE drop partition
statement, hive will delete every partition. hive doesn’t check that the partition you type is actually a partition of the table.
complicated queries in a union all can produce erroneous results
I can’t embed the actual sql here because it has a bunch of internal logic in it, but essentially if the first or second query is complicated we found two instances in our code where rows where being dropped. There are two workarounds: create temp tables and move the where statements to them, or create a partitioned table and similarly move the where logic elsewhere.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
or
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
hive often generates painfully bad hadoop
eg count distinct
will often attempt to push all data through a single reducer. For example, a very common task is to ask how many unique A are there by B, or to be concrete, how many unique identifier
s are there grouped by attribute
:
1 2 3 |
|
will attempt to push terabytes of data through a single reducer; manually setting the number of reducers doesn’t seem to help. To fix this, rewrite the sql as so (though this will then execute two hadoop jobs):
1 2 3 4 5 6 7 |
|
Conditions on joins are sometimes silently ignored
percent counters on mappers don’t work
Often when reading compressed files or from s3, counters snap from 0 to 100%
hive can write files it can’t read
If hive writes a file compressed by snappy with 0 length, it will happily write the file then die when attempting to read it with exception TODO. A common place you would encounter this is a query that is essentially an assert — that is, under normal operation, it will return no results. Thus our pipeline has queries ringed with compression statements:
1 2 3 4 5 6 |
|
hive can’t deserialize file names it wrote
When partitioning tables, the partition strings become part of the directory structure. Therefore, hive appears to encode (possibly url encode?) the partition values. I’m not exactly sure what the bug is, but if you put a dash (-
) in your partition value, if you issue an alter table $TABLE recover partitions
command, hive appears to double de-encode the filename and will be unable to add the partition to its metastore.
parameter substitution is fragile and busted
sed implementation — the set syntax
You can specify parameters as so: set batchid=${BATCH_ID};
and then refer to them in your code as:
1 2 3 |
|
There are a multitude of problems with this. First, this is how you handle arguments if oozie is running hive. If you want to pull arguments from the command line, you have to actually change the statement to be set batchid=${env:BATCH_ID};
and you can then run your code as BATCH_ID=20130501 hive -f my_hive_code.sql
. also, because hive sucks, inside the squiggle brackets is space sensitive. Do NOT, for readability write set batchid = ${ env:BATCH_ID };
applying sql operators to partitions crashes
Calling split
on a partition, as opposed to a column, appears to crash hive with an obtuse error message. We had a partition named partner_id
which was really two columns joined together with a dash. I wanted to use split
to separate the two columns and apply some filtering logic, but this appeared not to be possible. This can be worked around, but what a pain.
1 2 3 4 5 |
|
but this works
1 2 3 4 |
|
this is the exception, for googlers
FAILED: Hive Internal Error: java.lang.ClassCastException(org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector)
java.lang.ClassCastException: org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector
at org.apache.hadoop.hive.ql.optimizer.ppr.PartExprEvalUtils.evalExprWithPart(PartExprEvalUtils.java:77)
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.evalExprWithPart(PcrExprProcFactory.java:73)
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory$GenericFuncExprProcessor.process(PcrExprProcFactory.java:328)
at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102)
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.walkExprTree(PcrExprProcFactory.java:450)
at org.apache.hadoop.hive.ql.optimizer.pcr.PcrOpProcFactory$FilterPCR.process(PcrOpProcFactory.java:149)
at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125)
at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102)
at org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover.transform(PartitionConditionRemover.java:78)
at org.apache.hadoop.hive.ql.optimizer.Optimizer.optimize(Optimizer.java:87)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7339)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:430)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:337)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:889)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:261)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:218)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:409)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:344)
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:442)
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:457)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:655)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:567)
at org.apache.oozie.action.hadoop.HiveMain.runHive(HiveMain.java:303)
at org.apache.oozie.action.hadoop.HiveMain.run(HiveMain.java:280)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:37)
at org.apache.oozie.action.hadoop.HiveMain.main(HiveMain.java:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:478)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Intercepting System.exit(12)
hive tweaked compression algorithms
snappy
and lzo
are nearly impossible to decompress from the commandline or from outside the hadoop cluster, making debugging far harder than it ought to be. Sometimes there’s just no substitute for looking at the raw contents of a file. You can use gzip and zcat
, but only if you use gzip compression, eg:
1
|
|
recommended settings
These are not the defaults, at least on amazon.
set hive.exec.compress.output=true;
set hive.exec.compress.intermediate=true;
set mapred.compress.map.output=true;
set hive.exec.parallel=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec; -- if you wish to use bz2
todo:
- use external tables. But dropping a partition is bad bad; you have to make sure the underlying storage goes away.
- no support for dags, so oozie, but own problems with that
- best practice: all tables external?
- no join counters
- poor counter support
- poor ability to communicate; set dummy variables
- explain / analyze is often useless
- using set hive.cli.print.header=true; to see headers will cause hive to hang when writing a table
- join syntax: specify rhs condition in the syntax
- BUG: snappy dies on 0 data files
- CRUCIAL: no way to specify LOJ is 0 or 1;
- strange null handling
- best practice: always group by if supposed to be unique
- BUG: making table external must be in caps
- OOM indicator: java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271) Caused by: java.io.IOException: Task process exit with nonzero status of 137. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)
- logs disappear
- oozie hard to capture workflow logs
- trick: set dummy variables to point out where you are
- best practice: ssh key to ssh to all the remote boxes to login and look at the logs
- problem: scales very poorly in number if input files
- problem: hangs on launch while doing tons of s3 reads
- problem: dynamic partitions dont tell you what rows went where
- problem: inconsistent text formatting with rows loaded / which partition so hard to extract (annoying, but means nobody is a grownup)
- emr/ amazon: no easy way to terminate cluster from cluster
- hive problem: doesn’t make sure syntax is consistent
- hive SUPER ANNOYANCE: restarts count each query, so hard to find bug in file
- tip: use -—| for code you ran and don’t want to rerun, so you can “%s/^-—|//” to fix
- distribution / histogram code cant accept numbers as parameters
- no for loop; cut paste code
- example: python to write arrays
- example: python mapper, python reducer, python both
- set hive.optimize.skewjoin=true; has never done anything;
- always run as hive -f mycode.sql 2>&1 | tee -a log.run.00
- can’t include files in hive; need the ddl to be shared
- have to rely on lexical sort of dates in batch_id to make bad touch
- ctas fails, after it did all the work