Stochastic Nonsense

Put something smart here.

Don't Use Hive in Production

Hive is, in general, pretty bad. Hive on amazon is a nightmare. Here is just a small list of issues you will run into. ** tl;dr: friends don't let friends use hive** From very painful experience, the combination of hive, oozie, and amazon means that just because a job executed yesterday, there is at best probability 0.95 that it will work today. hive problems ------------- * hive has [no resume capability](#hive_no_resume) * hive is [buggy](#hive_buggy) * my favorite data loss bug: mistyping a partition name. * complicated queries in a union all will produce erroneous results. eg: * hive often generates painfully bad hadoop. eg `count distinct` will often attempt to push all data through a single reducer * Conditions on joins are sometimes silently ignored TODO EXAMPLE * percent counters on mappers, particularly when reading from s3, often snap from 0% to 100% * hive can write files it can't read (snappy shit) * hive tweaked compression algorithms so that snappy and lzo are difficult to decompress ``` sql select attribute, count(distinct(identifier)) as num_uniq_identifiers from data_store group by attribute; ``` but instead you will have to write ``` sql select attribute, count(*) as num_Uniq_identifiers from ( select identifier, attribute from data_store group by identifier, attribute ) Q group by attribute; ``` *Conditions on joins are sometimes silently ignored TODO EXAMPLE * percent counters on mappers, particularly when reading from s3, often snap from 0% to 100%. The way to see if your job is actually making progress is to check the S3N\_BYTES\_READ or FILE\_BYTES\_READ counters * hive tweaked compression algorithms. Use gzip or bz2; they've fiddled with snappy or lzo so that I'm unaware of a simple command line tool that will decompress part files. Thus if you need to sanity check data from hdfs or s3 you're out of luck. Advice: use bz2;set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec; * parameter substitution is done by a fragile busted sed implementation. You can specify parameters as so: <pre brush="text">set batchid=${BATCH_ID}; and then refer to them in your code as: select field1, field2 from tablename where batch_id = '${hiveconf:batchid}'; There are a multitude of problems with this. First, this is how you handle arguments if oozie is running hive. If you want to pull arguments from the command line, you have to actually change the statement to be< set batchid=${env:BATCH_ID}; and you can then run your code as BATCH_ID=20130501 hive -f my_hive_code.sql also, because hive sucks, inside the squiggle brackets is space sensitive. Do NOT, for readability write

set batchid = ${ env:BATCH_ID }; * recommended settings set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set mapred.compress.map.output=true; set hive.exec.parallel=true; ### amazon problems * Amazon feels free to break your cluster at will. They've hosed their \*own\* ganglia bootstrap installation scripts twice in the last week. Because they're staffed by user hostile morons, they broke the ganglia install, published a fix that we had to shove into all our internal tools that bring up clusters, *REMOVED THE FIX*, and published essentially the same fix in a new location. Thus breaking everything twice. If you would expect something like <pre brush='bash'>--bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia </pre> to work reliably, just because it's amazon's code installing ganglia on emr clusters managed by amazon, prepare to be disappointed. * hive on emr used to obey a -v flag that would echo the sql that hive was running to stdout or stderr. TODO Now you get long lists of stuff that looks as such: <pre brush="text">MapReduce Jobs Launched:

Job 0: Map: 128 Reduce: 128 Accumulative CPU: 69536.58 sec HDFS Read: 32768 HDFS Write: 10104512337 SUCCESS Total MapReduce CPU Time Spent: 0 days 19 hours 18 minutes 56 seconds 580 msec OK Time taken: 2056.326 seconds OK Time taken: 0.175 seconds OK Time taken: 0.015 seconds FAILED: Hive Internal Error: java.lang.RuntimeException(java.net.SocketTimeoutException: Call to ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000]) Say you have five or ten queries in a file. Which query produced that error? Read the entire stdout log and start counting. Nelson voice: HAHA

* alter table $table recover partitions; doesn't work. It only syncs from the filesystem to hive's metadata db. If, for example, part of your etl process removes a file that hive used to know about it will not be removed from the metadata * alter table $table recover partitions; has memory use that must be exponential in the number of files in s3. You can OOM hadoop with a thousand files. The workaround is to set HADOOP_HEAPSIZE ala <pre brush="bash">HADOOP_HEAPSIZE=4192 hive -e "use myschema; alter table earl recover partitions;"

HOWEVER, even this is, of course, buggy. Say you have an etl process that writes into a s3 bucket and your code needs to read that process. So you have your code run alter table etl_table recover partitions each time it runs. if you run more than one hive statement in a row, the hadoop_heapsize will be reset. Of course.

### amazon plus hive problems * Fragility * hive needs a database. Amazon does not like multiple region databases. Hive needs one. You will invent awful workarounds -- syncing process TODO * for performance, you should be using HDFS local to your hadoop cluster as often as possible instead of writing to then reading from s3. This causes namespace collisions * pursuant to above, you will have a bunch of tables hanging around in hive metadata pointed to a hdfs cluster that no longer exists. Reading from them will causes errors like: <pre brush="text">FAILED: Hive Internal Error: java.lang.RuntimeException(java.net.SocketTimeoutException: Call to ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000]) java.lang.RuntimeException: java.net.SocketTimeoutException: Call to ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000] at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:171) at org.apache.hadoop.hive.ql.Context.getExternalScratchDir(Context.java:223) at org.apache.hadoop.hive.ql.Context.getExternalTmpFileURI(Context.java:330) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genFileSinkPlan(SemanticAnalyzer.java:3893) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genBodyPlan(SemanticAnalyzer.java:5990) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6553) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7315) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:430) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:337) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:889) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:261) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:218) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:409) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:344) at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:442) at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:457) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:655) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:567) at org.apache.oozie.action.hadoop.HiveMain.runHive(HiveMain.java:303) at org.apache.oozie.action.hadoop.HiveMain.run(HiveMain.java:280) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:37) at org.apache.oozie.action.hadoop.HiveMain.main(HiveMain.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:478) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: java.net.SocketTimeoutException: Call to ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000] at org.apache.hadoop.ipc.Client.wrapException(Client.java:1100) at org.apache.hadoop.ipc.Client.call(Client.java:1072) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at $Proxy2.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:384) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:127) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:249) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:214) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1413) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:68) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1431) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:256) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:164) ... 35 more Caused by: java.net.SocketTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-10-196-46-92.us-west-1.compute.internal/10.196.46.92:9000] at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:213) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:511) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:481) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:434) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:557) at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:184) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1203) at org.apache.hadoop.ipc.Client.call(Client.java:1047) ... 49 more Even dropping the table will hang for minutes. The solution is to mark the table as external then drop it. *HOWEVER*, if you reuse this code on a cluster that still exists, hive will crap its pants To make dropping tables happen without a 20 minute timeout, mark them as external

ALTER TABLE fp_somename SET TBLPROPERTIES('EXTERNAL'='TRUE'); drop table fp_somename; hive has no job resume capability. Almost any interesting pipeline will have a long series of sql statements. For example, a workflow I wrote involved 2k lines of hive and nearly 50 individual sql statements. If it dies in the middle, either because of a bug or because of hadoop or hive or amazon breaking, you can't ask it to pick back up where it started. It's a best practice to partition every table with some identifier that associates that table with a data run. We use a partition / pseudocolumn named batch_id. That gets you so close but so far. What you then really want is some ability to say that if a fixed batch_id on a given table already exists, don't rerun the sql. In practice, what you do is either:
  1. split hive into tiny pieces and use oozie, which fractures your job logic all over the place and makes it hell to to perform ongoing development; or
  2. hand edit the sql file and add comments to the parts that have already run; or
  3. use ruby or python to run hive and, for each statement, check to see if a given batch_id partition already exists on a given table, and conditionally run the sql. This is complicated and fragile.Being able to restart runs is an incredibly important component of any hadoop workflow system.

    • hive is buggy. This probably deserves a page of its very own, but here's just a few of the things we've run into.
      • my favorite data loss bug: mistyping a partition name. This is of course aggravated by TODO
      • complicated queries in a union all will produce erroneous results. eg:

    <

    pre brush="sql">select from ( select -- some complicated thing here

union all

select -- another complicated thing here ) Q;

will produce incorrect output. A workaround is to use a partitioned table.

    *   hive often generates painfully bad hadoop. For example, if you do a count distinct, it will often TODO. For example, suppose you have cookies with attributes, where the cardinality of attributes is relatively low. You want to know the number of distinct cookies by attribute. The natural sql is <pre brush="sql">select attribute, count(distinct(identifier))


from data_store group by attribute; </pre>

        <pre brush="sql">select attribute, count(*) as count_identifiers


from ( select identifier, attribute from data_store group by identifier, attribute ) Q group by attribute; </pre>

        *   Conditions on joins are sometimes silently ignored TODO EXAMPLE
        *   percent counters on mappers, particularly when reading from s3, often snap from 0% to 100%. The way to see if your job is actually making progress is to check the S3N\_BYTES\_READ or FILE\_BYTES\_READ counters </ul> 
            *   hive tweaked compression algorithms. Use gzip or bz2; they've fiddled with snappy or lzo so that I'm unaware of a simple command line tool that will decompress part files. Thus if you need to sanity check data from hdfs or s3 you're out of luck.   
                Advice: use bz2 <pre brush="bash">set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;


</pre>

            *   parameter substitution is done by a fragile busted sed implementation. You can specify parameters as so: <pre brush="text">set batchid=${BATCH_ID};


</pre> and then refer to them in your code as:

                <pre brush="sql">select field1, field2


from tablename where batch_id = '${hiveconf:batchid}'; </pre> There are a multitude of problems with this. First, this is how you handle arguments if oozie is running hive. If you want to pull arguments from the command line, you have to actually change the statement to be

                <pre brush="text">set batchid=${env:BATCH_ID};


</pre> and you can then run your code as

                <pre brush="bash">BATCH_ID=20130501 hive -f my_hive_code.sql


</pre> also, because hive sucks, inside the squiggle brackets is space sensitive. Do \*NOT\*, for readability write

                <pre brush="text">set batchid = ${ env:BATCH_ID };


</pre>

                *     
                    <pre brush="bash">set hive.exec.compress.output=true;


set hive.exec.compress.intermediate=true; set mapred.compress.map.output=true; set hive.exec.parallel=true;

TODO:

                    *   use external tables. But dropping a partition is bad bad; you have to make sure the underlying storage goes away.
                    *   no support for dags, so oozie, but own problems with that 
                    *   best practice: all tables external? 
                    *   no join counters 
                    *   poor counter support 
                    *   poor ability to communicate; set dummy variables 
                    *   explain / analyze is often useless 
                    *   using set hive.cli.print.header=true; to see headers will cause hive to hang when writing a table 
                    *   join syntax: specify rhs condition in the syntax 
                    *   BUG: snappy dies on 0 data files 
                    *   FAVORITE BUG: hive can't read files it write; character encoding issues. Note they've changed this in different revisions. 
                    *   CRUCIAL: no way to specify LOJ is 0 or 1; 
                    *   strange null handling 
                    *   best practice: use /*+ MAPJOIN(AMID_FILT) */ 
                    *   best practice: always group by if supposed to be unique 
                    *   best practice: always set set mapred.reduce.tasks=128; will often shove through 1 reducer 
                    *   BUG: making table external must be in caps 
                    *   OOM indicator: <pre brush="text">java.lang.Throwable: Child Error
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)

Caused by: java.io.IOException: Task process exit with nonzero status of 137. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

                    *   logs disappear 
                    *   oozie hard to capture workflow logs 
                    *   trick: set dummy variables to point out where you are 
                    *   best practice: ssh key to ssh to all the remote boxes to login and look at the logs 
                    *   problem: scales very poorly in number if input files 
                    *   problem: hangs on launch while doing tons of s3 reads 
                    *   problem: dynamic partitions dont tell you what rows went where 
                    *   problem: inconsistent text formatting with rows loaded / which partition so hard to extract (annoying, but means nobody is a grownup) 
                    *   emr/ amazon: no easy way to terminate cluster from cluster 
                    *   hive problem: doesn't make sure syntax is consistent 
                    *   hive SUPER ANNOYANCE: restarts count each query, so hard to find bug in file 
                    *   tip: use \---| for code you ran and don't want to rerun, so you can "%s/^\---|//" to fix 
                    *   distribution / histogram code cant accept numbers as parameters 
                    *   no for loop; cut paste code 
                    *   example: python to write arrays 
                    *   example: python mapper, python reducer, python both 
                    *   set hive.optimize.skewjoin=true; has never done anything; 
                    *   always run as hive -f mycode.sql 2>&1 | tee -a log.run.00 
                    *   best practices: location prefix location 's3n://bucket_name/subdir/tablename'; 
                    *   can't include files in hive; need the ddl to be shared 
                    *   have to rely on lexical sort of dates in batch_id to make bad touch 
                    *   sometimes partitions are columns. Sometimes they aren't. Sometimes they die with crazy exceptions <pre brush="sql">-- this dies

select concat( amid, '', fp_partner_id ) as amid, ip_address, sum( num_sessions ) as num_sessions from fp_tfidf_amid_info where split( fp_partner_id, '' )[ 0 ] = 'directmatch' group by concat( amid, '_', fp_partner_id ), ip_address;

-- this works select concat( amid, '', fp_partner_id ) as amid, ip_address, sum( num_sessions ) as num_sessions from fp_tfidf_amid_info where fp_partner_id = 'directmatch_0' group by concat( amid, '', fp_partner_id ), ip_address;

                        <pre brush="text">FAILED: Hive Internal Error: java.lang.ClassCastException(org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector)

java.lang.ClassCastException: org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector at org.apache.hadoop.hive.ql.optimizer.ppr.PartExprEvalUtils.evalExprWithPart(PartExprEvalUtils.java:77) at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.evalExprWithPart(PcrExprProcFactory.java:73) at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory$GenericFuncExprProcessor.process(PcrExprProcFactory.java:328) at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102) at org.apache.hadoop.hive.ql.optimizer.pcr.PcrExprProcFactory.walkExprTree(PcrExprProcFactory.java:450) at org.apache.hadoop.hive.ql.optimizer.pcr.PcrOpProcFactory$FilterPCR.process(PcrOpProcFactory.java:149) at org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher.dispatch(DefaultRuleDispatcher.java:89) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.dispatch(DefaultGraphWalker.java:88) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.walk(DefaultGraphWalker.java:125) at org.apache.hadoop.hive.ql.lib.DefaultGraphWalker.startWalking(DefaultGraphWalker.java:102) at org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover.transform(PartitionConditionRemover.java:78) at org.apache.hadoop.hive.ql.optimizer.Optimizer.optimize(Optimizer.java:87) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7339) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:430) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:337) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:889) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:261) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:218) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:409) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:344) at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:442) at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:457) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:655) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:567) at org.apache.oozie.action.hadoop.HiveMain.runHive(HiveMain.java:303) at org.apache.oozie.action.hadoop.HiveMain.run(HiveMain.java:280) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:37) at org.apache.oozie.action.hadoop.HiveMain.main(HiveMain.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:478) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132) at org.apache.hadoop.mapred.Child.main(Child.java:249)

Intercepting System.exit(12)

                    *   I found that if one of the reducers generates 0 length files and then snappy and lzo compressed, Hive can't read the table 
                    *   ctas fails, after it did all the work